mirror of
https://github.com/GoodStartLabs/AI_Diplomacy.git
synced 2026-04-19 12:58:09 +00:00
1347 lines
No EOL
60 KiB
Python
1347 lines
No EOL
60 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Analyze Key Game Moments: Betrayals, Collaborations, and Playing Both Sides
|
|
|
|
This script analyzes Diplomacy game data to identify the most interesting strategic moments.
|
|
Enhanced with:
|
|
- More stringent rating criteria
|
|
- Integration of power diary entries for better context
|
|
- Analysis of well-executed strategies and strategic mistakes
|
|
"""
|
|
|
|
import json
|
|
import asyncio
|
|
import argparse
|
|
import logging
|
|
import csv
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Any
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime
|
|
import os
|
|
from dotenv import load_dotenv
|
|
|
|
# Import the client from ai_diplomacy module
|
|
from ai_diplomacy.clients import load_model_client
|
|
|
|
load_dotenv()
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class GameMoment:
|
|
"""Represents a key moment in the game"""
|
|
phase: str
|
|
category: str # BETRAYAL, COLLABORATION, PLAYING_BOTH_SIDES, BRILLIANT_STRATEGY, STRATEGIC_BLUNDER
|
|
powers_involved: List[str]
|
|
promise_agreement: str
|
|
actual_action: str
|
|
impact: str
|
|
interest_score: float
|
|
raw_messages: List[Dict]
|
|
raw_orders: Dict
|
|
diary_context: Dict[str, str] # New field for diary entries
|
|
|
|
@dataclass
|
|
class Lie:
|
|
"""Represents a detected lie in diplomatic communications"""
|
|
phase: str
|
|
liar: str
|
|
recipient: str
|
|
promise: str
|
|
diary_intent: str
|
|
actual_action: str
|
|
intentional: bool
|
|
explanation: str
|
|
|
|
class GameAnalyzer:
|
|
"""Analyzes Diplomacy game data for key strategic moments"""
|
|
|
|
def __init__(self, results_folder: str, model_name: str = "openrouter-google/gemini-2.5-flash-preview"):
|
|
self.results_folder = Path(results_folder)
|
|
self.game_data_path = self.results_folder / "lmvsgame.json"
|
|
self.overview_path = self.results_folder / "overview.jsonl"
|
|
self.csv_path = self.results_folder / "llm_responses.csv"
|
|
self.model_name = model_name
|
|
self.client = None
|
|
self.game_data = None
|
|
self.power_to_model = None
|
|
self.moments = []
|
|
self.diary_entries = {} # phase -> power -> diary content
|
|
self.invalid_moves_by_model = {} # Initialize attribute
|
|
self.lies = [] # Track detected lies
|
|
self.lies_by_model = {} # model -> {intentional: count, unintentional: count}
|
|
|
|
async def initialize(self):
|
|
"""Initialize the analyzer with game data and model client"""
|
|
# Load game data
|
|
with open(self.game_data_path, 'r') as f:
|
|
self.game_data = json.load(f)
|
|
|
|
# Load power-to-model mapping from overview.jsonl
|
|
with open(self.overview_path, 'r') as f:
|
|
lines = f.readlines()
|
|
# Second line contains the power-to-model mapping
|
|
if len(lines) >= 2:
|
|
self.power_to_model = json.loads(lines[1])
|
|
logger.info(f"Loaded power-to-model mapping: {self.power_to_model}")
|
|
else:
|
|
logger.warning("Could not find power-to-model mapping in overview.jsonl")
|
|
self.power_to_model = {}
|
|
|
|
# Load diary entries from CSV
|
|
self.diary_entries = self.parse_llm_responses_csv()
|
|
logger.info(f"Loaded diary entries for {len(self.diary_entries)} phases")
|
|
|
|
# Load invalid moves data from CSV
|
|
self.invalid_moves_by_model = self.parse_invalid_moves_from_csv()
|
|
logger.info(f"Loaded invalid moves for {len(self.invalid_moves_by_model)} models")
|
|
|
|
# Initialize model client
|
|
self.client = load_model_client(self.model_name)
|
|
logger.info(f"Initialized with model: {self.model_name}")
|
|
|
|
def parse_llm_responses_csv(self) -> Dict[str, Dict[str, str]]:
|
|
"""Parse the CSV file to extract diary entries by phase and power"""
|
|
diary_entries = {}
|
|
|
|
try:
|
|
import pandas as pd
|
|
# Use pandas for more robust CSV parsing
|
|
df = pd.read_csv(self.csv_path)
|
|
|
|
# Filter for negotiation diary entries
|
|
diary_df = df[df['response_type'] == 'negotiation_diary']
|
|
|
|
for _, row in diary_df.iterrows():
|
|
phase = row['phase']
|
|
power = row['power']
|
|
raw_response = str(row['raw_response']).strip()
|
|
|
|
if phase not in diary_entries:
|
|
diary_entries[phase] = {}
|
|
|
|
try:
|
|
# Try to parse as JSON first
|
|
response = json.loads(raw_response)
|
|
diary_content = f"Negotiation Summary: {response.get('negotiation_summary', 'N/A')}\n"
|
|
diary_content += f"Intent: {response.get('intent', 'N/A')}\n"
|
|
relationships = response.get('updated_relationships', {})
|
|
if isinstance(relationships, dict):
|
|
diary_content += f"Relationships: {relationships}"
|
|
else:
|
|
diary_content += f"Relationships: {relationships}"
|
|
diary_entries[phase][power] = diary_content
|
|
except (json.JSONDecodeError, TypeError):
|
|
# If JSON parsing fails, use a simplified version or skip
|
|
if raw_response and raw_response.lower() not in ['null', 'nan', 'none']:
|
|
diary_entries[phase][power] = f"Raw diary: {raw_response}"
|
|
|
|
logger.info(f"Successfully parsed {len(diary_entries)} phases with diary entries")
|
|
return diary_entries
|
|
|
|
except ImportError:
|
|
# Fallback to standard CSV if pandas not available
|
|
logger.info("Pandas not available, using standard CSV parsing")
|
|
import csv
|
|
|
|
with open(self.csv_path, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
try:
|
|
if row.get('response_type') == 'negotiation_diary':
|
|
phase = row.get('phase', '')
|
|
power = row.get('power', '')
|
|
|
|
if phase and power:
|
|
if phase not in diary_entries:
|
|
diary_entries[phase] = {}
|
|
|
|
raw_response = row.get('raw_response', '').strip()
|
|
|
|
try:
|
|
# Try to parse as JSON
|
|
response = json.loads(raw_response)
|
|
diary_content = f"Negotiation Summary: {response.get('negotiation_summary', 'N/A')}\n"
|
|
diary_content += f"Intent: {response.get('intent', 'N/A')}\n"
|
|
diary_content += f"Relationships: {response.get('updated_relationships', 'N/A')}"
|
|
diary_entries[phase][power] = diary_content
|
|
except (json.JSONDecodeError, TypeError):
|
|
if raw_response and raw_response != "null":
|
|
diary_entries[phase][power] = f"Raw diary: {raw_response}"
|
|
except Exception as e:
|
|
continue # Skip problematic rows
|
|
|
|
return diary_entries
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing CSV file: {e}")
|
|
return {}
|
|
|
|
def parse_invalid_moves_from_csv(self) -> Dict[str, int]:
|
|
"""Parse the CSV file to count invalid moves by model"""
|
|
invalid_moves_by_model = {}
|
|
|
|
try:
|
|
import pandas as pd
|
|
# Use pandas for more robust CSV parsing
|
|
df = pd.read_csv(self.csv_path)
|
|
|
|
# Look for failures in the success column
|
|
failure_df = df[df['success'].str.contains('Failure: Invalid LLM Moves', na=False)]
|
|
|
|
for _, row in failure_df.iterrows():
|
|
model = row['model']
|
|
success_text = str(row['success'])
|
|
|
|
# Extract the number from "Failure: Invalid LLM Moves (N):"
|
|
import re
|
|
match = re.search(r'Invalid LLM Moves \((\d+)\)', success_text)
|
|
if match:
|
|
invalid_count = int(match.group(1))
|
|
if model not in invalid_moves_by_model:
|
|
invalid_moves_by_model[model] = 0
|
|
invalid_moves_by_model[model] += invalid_count
|
|
|
|
logger.info(f"Successfully parsed invalid moves for {len(invalid_moves_by_model)} models")
|
|
return invalid_moves_by_model
|
|
|
|
except ImportError:
|
|
# Fallback to standard CSV if pandas not available
|
|
logger.info("Pandas not available, using standard CSV parsing for invalid moves")
|
|
import csv
|
|
import re
|
|
|
|
with open(self.csv_path, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
try:
|
|
success_text = row.get('success', '')
|
|
if 'Failure: Invalid LLM Moves' in success_text:
|
|
model = row.get('model', '')
|
|
match = re.search(r'Invalid LLM Moves \((\d+)\)', success_text)
|
|
if match and model:
|
|
invalid_count = int(match.group(1))
|
|
if model not in invalid_moves_by_model:
|
|
invalid_moves_by_model[model] = 0
|
|
invalid_moves_by_model[model] += invalid_count
|
|
except Exception as e:
|
|
continue # Skip problematic rows
|
|
|
|
return invalid_moves_by_model
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing invalid moves from CSV file: {e}")
|
|
return {}
|
|
|
|
def extract_turn_data(self, phase_data: Dict) -> Dict:
|
|
"""Extract relevant data from a single turn/phase"""
|
|
phase_name = phase_data.get("name", "")
|
|
|
|
# Get diary entries for this phase
|
|
phase_diaries = self.diary_entries.get(phase_name, {})
|
|
|
|
return {
|
|
"phase": phase_name,
|
|
"messages": phase_data.get("messages", []),
|
|
"orders": phase_data.get("orders", {}),
|
|
"summary": phase_data.get("summary", ""),
|
|
"statistical_summary": phase_data.get("statistical_summary", {}),
|
|
"diaries": phase_diaries
|
|
}
|
|
|
|
def create_analysis_prompt(self, turn_data: Dict) -> str:
|
|
"""Create the analysis prompt for a single turn"""
|
|
# Format messages for analysis
|
|
formatted_messages = []
|
|
for msg in turn_data.get("messages", []):
|
|
sender = msg.get('sender', 'Unknown')
|
|
sender_model = self.power_to_model.get(sender, '')
|
|
sender_str = f"{sender} ({sender_model})" if sender_model else sender
|
|
|
|
recipient = msg.get('recipient', 'Unknown')
|
|
recipient_model = self.power_to_model.get(recipient, '')
|
|
recipient_str = f"{recipient} ({recipient_model})" if recipient_model else recipient
|
|
|
|
formatted_messages.append(
|
|
f"{sender_str} to {recipient_str}: {msg.get('message', '')}"
|
|
)
|
|
|
|
# Format orders for analysis
|
|
formatted_orders = []
|
|
for power, power_orders in turn_data.get("orders", {}).items():
|
|
power_model = self.power_to_model.get(power, '')
|
|
power_str = f"{power} ({power_model})" if power_model else power
|
|
formatted_orders.append(f"{power_str}: {power_orders}")
|
|
|
|
# Format diary entries
|
|
formatted_diaries = []
|
|
for power, diary in turn_data.get("diaries", {}).items():
|
|
power_model = self.power_to_model.get(power, '')
|
|
power_str = f"{power} ({power_model})" if power_model else power
|
|
formatted_diaries.append(f"{power_str} DIARY:\n{diary}")
|
|
|
|
prompt = f"""You are analyzing diplomatic negotiations and subsequent military orders from a Diplomacy game. Your task is to identify key strategic moments in the following categories:
|
|
|
|
1. BETRAYAL: When a power explicitly promises one action but takes a contradictory action
|
|
2. COLLABORATION: When powers successfully coordinate as agreed
|
|
3. PLAYING_BOTH_SIDES: When a power makes conflicting promises to different parties
|
|
4. BRILLIANT_STRATEGY: Exceptionally well-executed strategic maneuvers that gain significant advantage
|
|
5. STRATEGIC_BLUNDER: Major strategic mistakes that significantly weaken a power's position
|
|
|
|
IMPORTANT SCORING GUIDELINES:
|
|
- Scores 1-3: Minor or routine diplomatic events
|
|
- Scores 4-6: Significant but expected diplomatic maneuvers
|
|
- Scores 7-8: Notable strategic moments with clear impact
|
|
- Scores 9-10: EXCEPTIONAL moments that are truly dramatic or game-changing
|
|
|
|
Reserve high scores (8+) for:
|
|
- Major betrayals that fundamentally shift alliances
|
|
- Successful coordinated attacks on major powers
|
|
- Clever deceptions that fool multiple powers
|
|
- Brilliant strategic maneuvers that dramatically improve position
|
|
- Catastrophic strategic errors with lasting consequences
|
|
- Actions that dramatically alter the game's balance
|
|
|
|
For this turn ({turn_data.get('phase', '')}), analyze:
|
|
|
|
PRIVATE DIARY ENTRIES (Powers' internal thoughts):
|
|
{chr(10).join(formatted_diaries) if formatted_diaries else 'No diary entries available'}
|
|
|
|
MESSAGES:
|
|
{chr(10).join(formatted_messages) if formatted_messages else 'No messages this turn'}
|
|
|
|
ORDERS:
|
|
{chr(10).join(formatted_orders) if formatted_orders else 'No orders this turn'}
|
|
|
|
TURN SUMMARY:
|
|
{turn_data.get('summary', 'No summary available')}
|
|
|
|
Identify ALL instances that fit the five categories. For each instance provide:
|
|
{{
|
|
"category": "BETRAYAL" or "COLLABORATION" or "PLAYING_BOTH_SIDES" or "BRILLIANT_STRATEGY" or "STRATEGIC_BLUNDER",
|
|
"powers_involved": ["POWER1", "POWER2", ...],
|
|
"promise_agreement": "What was promised/agreed/intended (or strategy attempted)",
|
|
"actual_action": "What actually happened",
|
|
"impact": "Strategic impact on the game",
|
|
"interest_score": 6.5 // 1-10 scale, be STRICT with high scores
|
|
}}
|
|
|
|
Use the diary entries to verify:
|
|
- Whether actions align with stated intentions
|
|
- Hidden motivations behind diplomatic moves
|
|
- Contradictions between public promises and private plans
|
|
- Strategic planning and its execution
|
|
|
|
Return your response as a JSON array of detected moments. If no relevant moments are found, return an empty array [].
|
|
|
|
Focus on:
|
|
- Comparing diary intentions vs actual orders
|
|
- Explicit promises vs actual orders
|
|
- Coordinated attacks or defenses
|
|
- DMZ violations
|
|
- Support promises kept or broken
|
|
- Conflicting negotiations with different powers
|
|
- Clever strategic positioning
|
|
- Missed strategic opportunities
|
|
- Tactical errors that cost supply centers
|
|
"""
|
|
return prompt
|
|
|
|
async def analyze_turn(self, phase_data: Dict) -> List[Dict]:
|
|
"""Analyze a single turn for key moments"""
|
|
turn_data = self.extract_turn_data(phase_data)
|
|
|
|
# Skip if no meaningful data
|
|
if not turn_data["messages"] and not turn_data["orders"]:
|
|
return []
|
|
|
|
prompt = self.create_analysis_prompt(turn_data)
|
|
|
|
try:
|
|
response = await self.client.generate_response(prompt)
|
|
|
|
# Parse JSON response
|
|
# Handle potential code blocks or direct JSON
|
|
if "```json" in response:
|
|
response = response.split("```json")[1].split("```")[0]
|
|
elif "```" in response:
|
|
response = response.split("```")[1].split("```")[0]
|
|
|
|
detected_moments = json.loads(response)
|
|
|
|
# Enrich with raw data
|
|
moments = []
|
|
for moment in detected_moments:
|
|
game_moment = GameMoment(
|
|
phase=turn_data["phase"],
|
|
category=moment.get("category", ""),
|
|
powers_involved=moment.get("powers_involved", []),
|
|
promise_agreement=moment.get("promise_agreement", ""),
|
|
actual_action=moment.get("actual_action", ""),
|
|
impact=moment.get("impact", ""),
|
|
interest_score=float(moment.get("interest_score", 5)),
|
|
raw_messages=turn_data["messages"],
|
|
raw_orders=turn_data["orders"],
|
|
diary_context=turn_data["diaries"]
|
|
)
|
|
moments.append(game_moment)
|
|
logger.info(f"Detected {game_moment.category} in {game_moment.phase} "
|
|
f"(score: {game_moment.interest_score})")
|
|
|
|
return moments
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing turn {turn_data.get('phase', '')}: {e}")
|
|
return []
|
|
|
|
def detect_lies_in_phase(self, phase_data: Dict) -> List[Lie]:
|
|
"""Detect lies by comparing messages, diary entries, and actual orders"""
|
|
phase_name = phase_data.get("name", "")
|
|
messages = phase_data.get("messages", [])
|
|
orders = phase_data.get("orders", {})
|
|
diaries = self.diary_entries.get(phase_name, {})
|
|
|
|
detected_lies = []
|
|
|
|
# Group messages by sender
|
|
messages_by_sender = {}
|
|
for msg in messages:
|
|
sender = msg.get('sender', '')
|
|
if sender not in messages_by_sender:
|
|
messages_by_sender[sender] = []
|
|
messages_by_sender[sender].append(msg)
|
|
|
|
# Analyze each power's messages against their diary and orders
|
|
for sender, sent_messages in messages_by_sender.items():
|
|
sender_diary = diaries.get(sender, '')
|
|
sender_orders = orders.get(sender, [])
|
|
|
|
for msg in sent_messages:
|
|
recipient = msg.get('recipient', '')
|
|
message_text = msg.get('message', '')
|
|
|
|
# Extract promises from message using keywords
|
|
promises = self.extract_promises_from_message(message_text)
|
|
|
|
for promise in promises:
|
|
# Check if promise was kept
|
|
lie_detected = self.check_promise_against_orders(
|
|
promise, sender_orders, sender_diary,
|
|
sender, recipient, phase_name
|
|
)
|
|
if lie_detected:
|
|
detected_lies.append(lie_detected)
|
|
|
|
return detected_lies
|
|
|
|
def extract_promises_from_message(self, message: str) -> List[Dict]:
|
|
"""Extract specific promises from a message"""
|
|
promises = []
|
|
message_lower = message.lower()
|
|
|
|
# Common promise patterns - more specific to Diplomacy
|
|
promise_patterns = [
|
|
# Support promises
|
|
(r'(?:i )?will support (?:your )?(\w+)(?:/\w+)? (?:to|into|-) (\w+)', 'support'),
|
|
(r'(?:my )?(\w+) (?:will )?s(?:upport)?s? (?:your )?(\w+)(?:/\w+)?(?:\s+)?(?:to|into|-)(?:\s+)?(\w+)', 'support'),
|
|
(r'a (\w+) s a (\w+)(?:\s+)?(?:-|to)(?:\s+)?(\w+)', 'support'),
|
|
(r'f (\w+) s (?:a |f )?(\w+)(?:\s+)?(?:-|to)(?:\s+)?(\w+)', 'support'),
|
|
# Movement promises
|
|
(r'(?:i )?will (?:move|order) (?:my )?(\w+) to (\w+)', 'move'),
|
|
(r'a (\w+)(?:\s+)?(?:->|-)(?:\s+)?(\w+)', 'move'),
|
|
(r'f (\w+)(?:\s+)?(?:->|-)(?:\s+)?(\w+)', 'move'),
|
|
(r'(\w+) (?:moves?|going) to (\w+)', 'move'),
|
|
# Hold promises
|
|
(r'(?:will )?hold (?:in )?(\w+)', 'hold'),
|
|
(r'(\w+) (?:will )?h(?:old)?s?', 'hold'),
|
|
(r'a (\w+) h', 'hold'),
|
|
(r'f (\w+) h', 'hold'),
|
|
# No attack promises
|
|
(r'(?:will |won\'t |will not )attack (\w+)', 'no_attack'),
|
|
(r'no (?:moves?|attacks?) (?:on|against|to) (\w+)', 'no_attack'),
|
|
(r'stay(?:ing)? out of (\w+)', 'no_attack'),
|
|
# DMZ promises
|
|
(r'dmz (?:in |on |for )?(\w+)', 'dmz'),
|
|
(r'(\w+) (?:will be|stays?|remains?) dmz', 'dmz'),
|
|
(r'demilitari[sz]ed? (?:zone )?(?:in |on )?(\w+)', 'dmz'),
|
|
# Specific coordination
|
|
(r'(?:agree|agreed) (?:to |on )?(.+)', 'agreement'),
|
|
(r'(?:promise|commit) (?:to |that )?(.+)', 'promise'),
|
|
]
|
|
|
|
import re
|
|
for pattern, promise_type in promise_patterns:
|
|
matches = re.finditer(pattern, message_lower, re.IGNORECASE)
|
|
for match in matches:
|
|
promise_dict = {
|
|
'type': promise_type,
|
|
'details': match.groups(),
|
|
'full_match': match.group(0),
|
|
'start': match.start(),
|
|
'end': match.end()
|
|
}
|
|
|
|
# Extract context around the promise
|
|
context_start = max(0, match.start() - 50)
|
|
context_end = min(len(message), match.end() + 50)
|
|
promise_dict['context'] = message[context_start:context_end]
|
|
|
|
promises.append(promise_dict)
|
|
|
|
return promises
|
|
|
|
def check_promise_against_orders(self, promise: Dict, actual_orders: List[str],
|
|
diary: str, sender: str, recipient: str,
|
|
phase: str) -> Optional[Lie]:
|
|
"""Check if a promise was kept by comparing against actual orders"""
|
|
promise_type = promise['type']
|
|
details = promise['details']
|
|
|
|
# Normalize orders for comparison
|
|
orders_str = ' '.join(actual_orders).lower()
|
|
orders_lower = [order.lower() for order in actual_orders]
|
|
|
|
lie_detected = False
|
|
actual_action = ""
|
|
|
|
# More sophisticated checking based on promise type
|
|
if promise_type == 'support':
|
|
# Check if support was given
|
|
if len(details) >= 3:
|
|
unit_loc = details[0] # Location of supporting unit
|
|
supported_loc = details[1] # Location being supported from
|
|
target_loc = details[2] # Target location
|
|
|
|
# Check various support formats
|
|
support_patterns = [
|
|
f"{unit_loc} s {supported_loc} - {target_loc}",
|
|
f"{unit_loc} s {supported_loc}-{target_loc}",
|
|
f"{unit_loc} support {supported_loc} - {target_loc}",
|
|
f"s {supported_loc} - {target_loc}" # Sometimes location is implicit
|
|
]
|
|
|
|
found_support = False
|
|
for pattern in support_patterns:
|
|
if pattern in orders_str:
|
|
found_support = True
|
|
break
|
|
|
|
if not found_support:
|
|
lie_detected = True
|
|
actual_action = f"No support from {unit_loc} for {supported_loc}-{target_loc} found in orders: {', '.join(actual_orders)}"
|
|
|
|
elif promise_type == 'move':
|
|
if len(details) >= 2:
|
|
unit = details[0]
|
|
destination = details[1]
|
|
|
|
# Check various move formats
|
|
move_patterns = [
|
|
f"{unit} - {destination}",
|
|
f"{unit}-{destination}",
|
|
f"a {unit} - {destination}",
|
|
f"f {unit} - {destination}"
|
|
]
|
|
|
|
found_move = False
|
|
for pattern in move_patterns:
|
|
if pattern in orders_str:
|
|
found_move = True
|
|
break
|
|
|
|
if not found_move:
|
|
# Check what the unit actually did
|
|
unit_action = "Unknown"
|
|
for order in orders_lower:
|
|
if unit in order:
|
|
unit_action = order
|
|
break
|
|
|
|
lie_detected = True
|
|
actual_action = f"Unit at {unit} did not move to {destination}. Actual order: {unit_action}"
|
|
|
|
elif promise_type == 'hold':
|
|
if len(details) >= 1:
|
|
unit = details[0]
|
|
|
|
hold_patterns = [
|
|
f"{unit} h",
|
|
f"a {unit} h",
|
|
f"f {unit} h",
|
|
f"{unit} hold"
|
|
]
|
|
|
|
found_hold = False
|
|
for pattern in hold_patterns:
|
|
if pattern in orders_str:
|
|
found_hold = True
|
|
break
|
|
|
|
if not found_hold:
|
|
# Check what the unit actually did
|
|
unit_action = "Unknown"
|
|
for order in orders_lower:
|
|
if unit in order:
|
|
unit_action = order
|
|
break
|
|
|
|
lie_detected = True
|
|
actual_action = f"Unit at {unit} did not hold. Actual order: {unit_action}"
|
|
|
|
elif promise_type == 'no_attack':
|
|
if len(details) >= 1:
|
|
target = details[0]
|
|
|
|
# Check if any unit attacked the target
|
|
attack_patterns = [
|
|
f"- {target}",
|
|
f"-{target}",
|
|
f"to {target}",
|
|
f"into {target}"
|
|
]
|
|
|
|
for pattern in attack_patterns:
|
|
if pattern in orders_str:
|
|
# Find which unit attacked
|
|
attacking_unit = "Unknown"
|
|
for order in orders_lower:
|
|
if pattern in order:
|
|
attacking_unit = order
|
|
break
|
|
|
|
lie_detected = True
|
|
actual_action = f"Attacked {target} despite promise not to. Order: {attacking_unit}"
|
|
break
|
|
|
|
if lie_detected:
|
|
# Determine if intentional based on diary
|
|
intentional = self.check_if_lie_intentional(promise, diary, actual_action)
|
|
|
|
return Lie(
|
|
phase=phase,
|
|
liar=sender,
|
|
recipient=recipient,
|
|
promise=f"{promise['full_match']} (context: {promise.get('context', '')})",
|
|
diary_intent=self.extract_relevant_diary_excerpt(diary, promise),
|
|
actual_action=actual_action,
|
|
intentional=intentional,
|
|
explanation="Intentional deception" if intentional else "Possible misunderstanding or changed circumstances"
|
|
)
|
|
|
|
return None
|
|
|
|
def check_if_lie_intentional(self, promise: Dict, diary: str, actual_action: str) -> bool:
|
|
"""Determine if a lie was intentional based on diary content"""
|
|
if not diary:
|
|
return False
|
|
|
|
diary_lower = diary.lower()
|
|
|
|
# Indicators of intentional deception
|
|
deception_indicators = [
|
|
'mislead', 'deceive', 'trick', 'false', 'pretend',
|
|
'while actually', 'but will', 'instead will',
|
|
'make them think', 'let them believe', 'deliberately',
|
|
'stab', 'betray', 'double-cross', 'feign',
|
|
'was deliberately misled', 'lied to', 'false promise'
|
|
]
|
|
|
|
# Check for explicit deception language
|
|
for indicator in deception_indicators:
|
|
if indicator in diary_lower:
|
|
return True
|
|
|
|
# Check if diary explicitly contradicts the promise
|
|
promise_text = promise['full_match']
|
|
promise_context = promise.get('context', '').lower()
|
|
|
|
# Look for specific contradictions based on promise type
|
|
if promise['type'] == 'support':
|
|
if len(promise['details']) >= 2:
|
|
target = promise['details'][1]
|
|
# Check if diary mentions NOT supporting or attacking instead
|
|
if (f"not support {target}" in diary_lower or
|
|
f"attack {target}" in diary_lower or
|
|
f"will not help" in diary_lower):
|
|
return True
|
|
|
|
elif promise['type'] == 'no_attack':
|
|
target = promise['details'][0] if promise['details'] else ''
|
|
if target and (f"attack {target}" in diary_lower or
|
|
f"move to {target}" in diary_lower or
|
|
f"take {target}" in diary_lower):
|
|
return True
|
|
|
|
elif promise['type'] == 'move' or promise['type'] == 'hold':
|
|
# Check if diary mentions different plans
|
|
if 'different plan' in diary_lower or 'change of plans' in diary_lower:
|
|
# But not if it mentions unexpected circumstances
|
|
if 'forced to' not in diary_lower and 'had to' not in diary_lower:
|
|
return True
|
|
|
|
# Check for planning contradictory actions
|
|
if 'negotiation_summary' in diary_lower:
|
|
# Extract negotiation summary section
|
|
summary_start = diary_lower.find('negotiation_summary')
|
|
summary_end = diary_lower.find('intent:', summary_start) if summary_start != -1 else len(diary_lower)
|
|
if summary_start != -1:
|
|
summary_section = diary_lower[summary_start:summary_end]
|
|
|
|
# Check if the summary mentions agreements that contradict the promise
|
|
if promise['type'] == 'support' and 'agreed' in promise_context:
|
|
# Check if diary mentions different agreement
|
|
if 'agreed' in summary_section and promise_text not in summary_section:
|
|
return True
|
|
|
|
# Additional check: if diary mentions the recipient being deceived
|
|
recipient_mentioned = False
|
|
if 'details' in promise and len(promise['details']) > 0:
|
|
for detail in promise['details']:
|
|
if detail and detail.lower() in diary_lower:
|
|
recipient_mentioned = True
|
|
break
|
|
|
|
if recipient_mentioned and any(word in diary_lower for word in ['trick', 'fool', 'deceive', 'mislead']):
|
|
return True
|
|
|
|
return False
|
|
|
|
def extract_relevant_diary_excerpt(self, diary: str, promise: Dict) -> str:
|
|
"""Extract the most relevant part of diary related to the promise"""
|
|
if not diary:
|
|
return "No diary entry"
|
|
|
|
# Try to find relevant sentences
|
|
sentences = diary.split('.')
|
|
relevant = []
|
|
|
|
promise_keywords = promise['full_match'].split()
|
|
for sentence in sentences:
|
|
if any(keyword in sentence.lower() for keyword in promise_keywords):
|
|
relevant.append(sentence.strip())
|
|
|
|
if relevant:
|
|
return '. '.join(relevant[:2]) # Return up to 2 relevant sentences
|
|
else:
|
|
# Return first 100 chars if no specific match
|
|
return diary[:100] + "..." if len(diary) > 100 else diary
|
|
|
|
async def analyze_game(self, max_phases: Optional[int] = None, max_concurrent: int = 5):
|
|
"""Analyze the entire game for key moments with concurrent processing
|
|
|
|
Args:
|
|
max_phases: Maximum number of phases to analyze (None = all)
|
|
max_concurrent: Maximum number of concurrent phase analyses
|
|
"""
|
|
phases = self.game_data.get("phases", [])
|
|
|
|
if max_phases is not None:
|
|
phases = phases[:max_phases]
|
|
logger.info(f"Analyzing first {len(phases)} phases (out of {len(self.game_data.get('phases', []))} total)...")
|
|
else:
|
|
logger.info(f"Analyzing {len(phases)} phases...")
|
|
|
|
# Process phases in batches to avoid overwhelming the API
|
|
all_moments = []
|
|
|
|
for i in range(0, len(phases), max_concurrent):
|
|
batch = phases[i:i + max_concurrent]
|
|
batch_start = i + 1
|
|
batch_end = min(i + max_concurrent, len(phases))
|
|
|
|
logger.info(f"Processing batch {batch_start}-{batch_end} of {len(phases)} phases...")
|
|
|
|
# Create tasks for concurrent processing
|
|
tasks = []
|
|
for j, phase_data in enumerate(batch):
|
|
phase_name = phase_data.get("name", f"Phase {i+j}")
|
|
logger.info(f"Starting analysis of phase {phase_name}")
|
|
task = self.analyze_turn(phase_data)
|
|
tasks.append(task)
|
|
|
|
# Wait for all tasks in this batch to complete
|
|
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Process results and handle any exceptions
|
|
for j, result in enumerate(batch_results):
|
|
if isinstance(result, Exception):
|
|
phase_name = batch[j].get("name", f"Phase {i+j}")
|
|
logger.error(f"Error analyzing phase {phase_name}: {result}")
|
|
else:
|
|
all_moments.extend(result)
|
|
|
|
# Small delay between batches to be respectful to the API
|
|
if i + max_concurrent < len(phases):
|
|
logger.info(f"Batch complete. Waiting 2 seconds before next batch...")
|
|
await asyncio.sleep(2)
|
|
|
|
self.moments = all_moments
|
|
|
|
# Analyze lies separately
|
|
logger.info("Analyzing diplomatic lies...")
|
|
for phase_data in phases:
|
|
phase_lies = self.detect_lies_in_phase(phase_data)
|
|
self.lies.extend(phase_lies)
|
|
|
|
# Count lies by model
|
|
for lie in self.lies:
|
|
liar_model = self.power_to_model.get(lie.liar, 'Unknown')
|
|
if liar_model not in self.lies_by_model:
|
|
self.lies_by_model[liar_model] = {'intentional': 0, 'unintentional': 0}
|
|
|
|
if lie.intentional:
|
|
self.lies_by_model[liar_model]['intentional'] += 1
|
|
else:
|
|
self.lies_by_model[liar_model]['unintentional'] += 1
|
|
|
|
# Sort moments by interest score
|
|
self.moments.sort(key=lambda m: m.interest_score, reverse=True)
|
|
|
|
logger.info(f"Analysis complete. Found {len(self.moments)} key moments and {len(self.lies)} lies.")
|
|
|
|
def format_power_with_model(self, power: str) -> str:
|
|
"""Format power name with model in parentheses"""
|
|
model = self.power_to_model.get(power, '')
|
|
return f"{power} ({model})" if model else power
|
|
|
|
def phase_sort_key(self, phase_name):
|
|
"""Create a sortable key for diplomacy phases like 'S1901M', 'F1901M', etc."""
|
|
# Extract season, year, and type
|
|
if not phase_name or len(phase_name) < 6:
|
|
return (0, 0, "")
|
|
|
|
try:
|
|
season = phase_name[0] # S, F, W
|
|
year = int(phase_name[1:5]) if phase_name[1:5].isdigit() else 0 # 1901, 1902, etc.
|
|
phase_type = phase_name[5:] # M, A, R
|
|
|
|
# Order: Spring (S) < Fall (F) < Winter (W)
|
|
season_order = {"S": 1, "F": 2, "W": 3}.get(season, 0)
|
|
|
|
return (year, season_order, phase_type)
|
|
except Exception:
|
|
return (0, 0, "")
|
|
|
|
async def generate_narrative(self) -> str:
|
|
"""Generate a narrative story of the game using phase summaries and top moments"""
|
|
# Collect all phase summaries
|
|
phase_summaries = []
|
|
phases_with_summaries = []
|
|
|
|
for phase in self.game_data.get("phases", []):
|
|
if phase.get("summary"):
|
|
phase_name = phase.get("name", "")
|
|
summary = phase.get("summary", "")
|
|
phases_with_summaries.append((phase_name, summary))
|
|
|
|
# Sort phases chronologically
|
|
phases_with_summaries.sort(key=lambda p: self.phase_sort_key(p[0]))
|
|
|
|
# Create summary strings
|
|
for phase_name, summary in phases_with_summaries:
|
|
phase_summaries.append(f"{phase_name}: {summary}")
|
|
|
|
# Create the narrative prompt
|
|
narrative_prompt = f"""You are a master war historian writing a dramatic chronicle of a Diplomacy game. Transform the comprehensive game record below into a single, gripping narrative of betrayal, alliance, and conquest.
|
|
|
|
THE COMPETING POWERS (always refer to them as "Power (Model)"):
|
|
{chr(10).join([f"- {power} ({model})" for power, model in sorted(self.power_to_model.items())])}
|
|
|
|
COMPLETE GAME RECORD (synthesize all of this into your narrative):
|
|
{chr(10).join(phase_summaries)}
|
|
|
|
IMPORTANT POWER DIARIES (internal thoughts of each power):
|
|
"""
|
|
# Sort diary phases chronologically
|
|
diary_phases = list(self.diary_entries.keys())
|
|
diary_phases.sort(key=self.phase_sort_key)
|
|
|
|
# Include power diaries for context (early phases)
|
|
for phase in diary_phases[:3]: # First few phases for early intentions
|
|
narrative_prompt += f"Phase {phase}:\n"
|
|
for power, diary in sorted(self.diary_entries[phase].items()):
|
|
power_with_model = self.format_power_with_model(power)
|
|
diary_excerpt = diary # Display full diary content
|
|
narrative_prompt += f"- {power_with_model}: {diary_excerpt}\n"
|
|
narrative_prompt += "\n"
|
|
|
|
# Also include some late-game diaries
|
|
if len(diary_phases) > 3:
|
|
for phase in diary_phases[-2:]: # Last two phases for endgame context
|
|
narrative_prompt += f"Phase {phase}:\n"
|
|
for power, diary in sorted(self.diary_entries[phase].items()):
|
|
power_with_model = self.format_power_with_model(power)
|
|
diary_excerpt = diary # Display full diary content
|
|
narrative_prompt += f"- {power_with_model}: {diary_excerpt}\n"
|
|
narrative_prompt += "\n"
|
|
|
|
narrative_prompt += """
|
|
KEY DRAMATIC MOMENTS (reference these highlights appropriately):
|
|
"""
|
|
# Extract top moments from each category for narrative context
|
|
key_moments = []
|
|
for category in ["BETRAYAL", "COLLABORATION", "PLAYING_BOTH_SIDES", "BRILLIANT_STRATEGY", "STRATEGIC_BLUNDER"]:
|
|
category_moments = [m for m in self.moments if m.category == category]
|
|
category_moments.sort(key=lambda m: m.interest_score, reverse=True)
|
|
key_moments.extend(category_moments[:5]) # Top 5 from each category
|
|
|
|
# Sort by phase chronologically
|
|
key_moments.sort(key=lambda m: self.phase_sort_key(m.phase))
|
|
|
|
# Format dramatic moments with power names and models (simpler format)
|
|
for moment in key_moments:
|
|
powers_with_models = [f"{p} ({self.power_to_model.get(p, 'Unknown')})" for p in moment.powers_involved]
|
|
narrative_prompt += f"{moment.phase} - {moment.category} (Score: {moment.interest_score}/10): {', '.join(powers_with_models)}\n"
|
|
|
|
narrative_prompt += """
|
|
CRITICAL INSTRUCTIONS:
|
|
- Write EXACTLY 1-2 paragraphs that tell the COMPLETE story of the ENTIRE game
|
|
- This is NOT a summary of each phase - it's ONE flowing narrative of the whole game
|
|
- Always refer to powers as "PowerName (ModelName)" - e.g., "Germany (o3)", "France (o4-mini)"
|
|
- Start with how the game began and the initial alliances
|
|
- Cover the major turning points and dramatic moments
|
|
- End with how the game concluded and who won
|
|
- Use dramatic, evocative language but be concise
|
|
- Focus on the overall arc of the game, not individual phase details
|
|
|
|
Create a single, cohesive narrative that captures the essence of the entire game from start to finish. Think of it as the opening passage of a history book chapter about this conflict.
|
|
"""
|
|
|
|
try:
|
|
response = await self.client.generate_response(narrative_prompt)
|
|
return response
|
|
except Exception as e:
|
|
logger.error(f"Error generating narrative: {e}")
|
|
return "Unable to generate narrative due to an error."
|
|
|
|
async def generate_report(self, output_path: Optional[str] = None):
|
|
"""Generate a markdown report of key moments"""
|
|
# Generate unique filename with datetime if no path specified
|
|
if output_path is None:
|
|
# Create in the game_moments directory
|
|
game_moments_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "game_moments")
|
|
os.makedirs(game_moments_dir, exist_ok=True)
|
|
|
|
# Use results folder name in the file name
|
|
results_name = os.path.basename(os.path.normpath(str(self.results_folder)))
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
output_path = os.path.join(game_moments_dir, f"{results_name}_report_{timestamp}.md")
|
|
|
|
# Generate the narrative first
|
|
narrative = await self.generate_narrative()
|
|
|
|
report_lines = [
|
|
"# Diplomacy Game Analysis: Key Moments",
|
|
f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
|
|
f"Game: {self.game_data_path}",
|
|
"",
|
|
"## Game Narrative",
|
|
"",
|
|
narrative,
|
|
"",
|
|
"---",
|
|
"",
|
|
"## Summary",
|
|
f"- Total moments analyzed: {len(self.moments)}",
|
|
f"- Betrayals: {len([m for m in self.moments if m.category == 'BETRAYAL'])}",
|
|
f"- Collaborations: {len([m for m in self.moments if m.category == 'COLLABORATION'])}",
|
|
f"- Playing Both Sides: {len([m for m in self.moments if m.category == 'PLAYING_BOTH_SIDES'])}",
|
|
f"- Brilliant Strategies: {len([m for m in self.moments if m.category == 'BRILLIANT_STRATEGY'])}",
|
|
f"- Strategic Blunders: {len([m for m in self.moments if m.category == 'STRATEGIC_BLUNDER'])}",
|
|
"",
|
|
"## Score Distribution",
|
|
f"- Scores 9-10: {len([m for m in self.moments if m.interest_score >= 9])}",
|
|
f"- Scores 7-8: {len([m for m in self.moments if 7 <= m.interest_score < 9])}",
|
|
f"- Scores 4-6: {len([m for m in self.moments if 4 <= m.interest_score < 7])}",
|
|
f"- Scores 1-3: {len([m for m in self.moments if m.interest_score < 4])}",
|
|
"",
|
|
"## Power Models",
|
|
""
|
|
]
|
|
|
|
# Add power-model mapping
|
|
for power, model in sorted(self.power_to_model.items()):
|
|
report_lines.append(f"- **{power}**: {model}")
|
|
|
|
# Add invalid moves analysis section RIGHT AFTER Power Models
|
|
if self.invalid_moves_by_model:
|
|
report_lines.extend([
|
|
"",
|
|
"## Invalid Moves by Model",
|
|
""
|
|
])
|
|
|
|
sorted_invalid = sorted(self.invalid_moves_by_model.items(),
|
|
key=lambda x: x[1], reverse=True)
|
|
for model, count in sorted_invalid:
|
|
report_lines.append(f"- **{model}**: {count} invalid moves")
|
|
|
|
# Add lies analysis section
|
|
report_lines.extend([
|
|
"",
|
|
"## Lies Analysis",
|
|
"",
|
|
"### Lies by Model",
|
|
""
|
|
])
|
|
|
|
# Sort models by total lies
|
|
sorted_models = sorted(self.lies_by_model.items(),
|
|
key=lambda x: x[1]['intentional'] + x[1]['unintentional'],
|
|
reverse=True)
|
|
|
|
for model, counts in sorted_models:
|
|
total = counts['intentional'] + counts['unintentional']
|
|
if total > 0: # Only show models with lies
|
|
report_lines.append(f"- **{model}**: {total} total lies ({counts['intentional']} intentional, {counts['unintentional']} unintentional)")
|
|
|
|
# Add top lies examples
|
|
if self.lies: # Only add if there are lies
|
|
report_lines.extend([
|
|
"",
|
|
"### Notable Lies",
|
|
""
|
|
])
|
|
|
|
# Show top 5 intentional lies
|
|
intentional_lies = [lie for lie in self.lies if lie.intentional]
|
|
for i, lie in enumerate(intentional_lies[:5], 1):
|
|
liar_str = self.format_power_with_model(lie.liar)
|
|
recipient_str = self.format_power_with_model(lie.recipient)
|
|
report_lines.extend([
|
|
f"#### {i}. {lie.phase} - Intentional Deception",
|
|
f"**{liar_str}** to **{recipient_str}**",
|
|
"",
|
|
f"**Promise:** \"{lie.promise}\"",
|
|
"",
|
|
f"**Diary Intent:** {lie.diary_intent}",
|
|
"",
|
|
f"**Actual Action:** {lie.actual_action}",
|
|
""
|
|
])
|
|
|
|
# Add category breakdowns with detailed information
|
|
report_lines.extend([
|
|
"",
|
|
"## Key Strategic Moments by Category",
|
|
""
|
|
])
|
|
|
|
# BETRAYALS SECTION
|
|
report_lines.extend([
|
|
"### Betrayals",
|
|
"_When powers explicitly promised one action but took a contradictory action_",
|
|
""
|
|
])
|
|
|
|
betrayals = [m for m in self.moments if m.category == "BETRAYAL"]
|
|
betrayals.sort(key=lambda m: m.interest_score, reverse=True)
|
|
|
|
for i, moment in enumerate(betrayals[:5], 1):
|
|
powers_str = ', '.join([self.format_power_with_model(p) for p in moment.powers_involved])
|
|
report_lines.extend([
|
|
f"#### {i}. {moment.phase} (Score: {moment.interest_score}/10)",
|
|
f"**Powers Involved:** {powers_str}",
|
|
"",
|
|
f"**Promise:** {moment.promise_agreement if moment.promise_agreement else 'N/A'}",
|
|
"",
|
|
f"**Actual Action:** {moment.actual_action if moment.actual_action else 'N/A'}",
|
|
"",
|
|
f"**Impact:** {moment.impact if moment.impact else 'N/A'}",
|
|
"",
|
|
"**Diary Context:**",
|
|
""
|
|
])
|
|
|
|
# Add relevant diary entries
|
|
for power in moment.powers_involved:
|
|
if power in moment.diary_context:
|
|
power_with_model = self.format_power_with_model(power)
|
|
report_lines.append(f"_{power_with_model} Diary:_ {moment.diary_context[power]}")
|
|
report_lines.append("")
|
|
|
|
report_lines.append("")
|
|
|
|
# COLLABORATIONS SECTION
|
|
report_lines.extend([
|
|
"### Collaborations",
|
|
"_When powers successfully coordinated as agreed_",
|
|
""
|
|
])
|
|
|
|
collaborations = [m for m in self.moments if m.category == "COLLABORATION"]
|
|
collaborations.sort(key=lambda m: m.interest_score, reverse=True)
|
|
|
|
for i, moment in enumerate(collaborations[:5], 1):
|
|
powers_str = ', '.join([self.format_power_with_model(p) for p in moment.powers_involved])
|
|
report_lines.extend([
|
|
f"#### {i}. {moment.phase} (Score: {moment.interest_score}/10)",
|
|
f"**Powers Involved:** {powers_str}",
|
|
"",
|
|
f"**Agreement:** {moment.promise_agreement if moment.promise_agreement else 'N/A'}",
|
|
"",
|
|
f"**Action Taken:** {moment.actual_action if moment.actual_action else 'N/A'}",
|
|
"",
|
|
f"**Impact:** {moment.impact if moment.impact else 'N/A'}",
|
|
"",
|
|
"**Diary Context:**",
|
|
""
|
|
])
|
|
|
|
# Add relevant diary entries
|
|
for power in moment.powers_involved:
|
|
if power in moment.diary_context:
|
|
power_with_model = self.format_power_with_model(power)
|
|
report_lines.append(f"_{power_with_model} Diary:_ {moment.diary_context[power]}")
|
|
report_lines.append("")
|
|
|
|
report_lines.append("")
|
|
|
|
# PLAYING BOTH SIDES SECTION
|
|
report_lines.extend([
|
|
"### Playing Both Sides",
|
|
"_When a power made conflicting promises to different parties_",
|
|
""
|
|
])
|
|
|
|
playing_both = [m for m in self.moments if m.category == "PLAYING_BOTH_SIDES"]
|
|
playing_both.sort(key=lambda m: m.interest_score, reverse=True)
|
|
|
|
for i, moment in enumerate(playing_both[:5], 1):
|
|
powers_str = ', '.join([self.format_power_with_model(p) for p in moment.powers_involved])
|
|
report_lines.extend([
|
|
f"#### {i}. {moment.phase} (Score: {moment.interest_score}/10)",
|
|
f"**Powers Involved:** {powers_str}",
|
|
"",
|
|
f"**Conflicting Promises:** {moment.promise_agreement if moment.promise_agreement else 'N/A'}",
|
|
"",
|
|
f"**Actual Action:** {moment.actual_action if moment.actual_action else 'N/A'}",
|
|
"",
|
|
f"**Impact:** {moment.impact if moment.impact else 'N/A'}",
|
|
"",
|
|
"**Diary Context:**",
|
|
""
|
|
])
|
|
|
|
# Add relevant diary entries
|
|
for power in moment.powers_involved:
|
|
if power in moment.diary_context:
|
|
power_with_model = self.format_power_with_model(power)
|
|
report_lines.append(f"_{power_with_model} Diary:_ {moment.diary_context[power]}")
|
|
report_lines.append("")
|
|
|
|
report_lines.append("")
|
|
|
|
# BRILLIANT STRATEGIES SECTION
|
|
report_lines.extend([
|
|
"### Brilliant Strategies",
|
|
"_Exceptionally well-executed strategic maneuvers that gained significant advantage_",
|
|
""
|
|
])
|
|
|
|
brilliant = [m for m in self.moments if m.category == "BRILLIANT_STRATEGY"]
|
|
brilliant.sort(key=lambda m: m.interest_score, reverse=True)
|
|
|
|
for i, moment in enumerate(brilliant[:5], 1):
|
|
powers_str = ', '.join([self.format_power_with_model(p) for p in moment.powers_involved])
|
|
report_lines.extend([
|
|
f"#### {i}. {moment.phase} (Score: {moment.interest_score}/10)",
|
|
f"**Powers Involved:** {powers_str}",
|
|
"",
|
|
f"**Strategy:** {moment.promise_agreement if moment.promise_agreement else 'N/A'}",
|
|
"",
|
|
f"**Execution:** {moment.actual_action if moment.actual_action else 'N/A'}",
|
|
"",
|
|
f"**Impact:** {moment.impact if moment.impact else 'N/A'}",
|
|
"",
|
|
"**Diary Context:**",
|
|
""
|
|
])
|
|
|
|
# Add relevant diary entries
|
|
for power in moment.powers_involved:
|
|
if power in moment.diary_context:
|
|
power_with_model = self.format_power_with_model(power)
|
|
report_lines.append(f"_{power_with_model} Diary:_ {moment.diary_context[power]}")
|
|
report_lines.append("")
|
|
|
|
report_lines.append("")
|
|
|
|
# STRATEGIC BLUNDERS SECTION
|
|
report_lines.extend([
|
|
"### Strategic Blunders",
|
|
"_Major strategic mistakes that significantly weakened a power's position_",
|
|
""
|
|
])
|
|
|
|
blunders = [m for m in self.moments if m.category == "STRATEGIC_BLUNDER"]
|
|
blunders.sort(key=lambda m: m.interest_score, reverse=True)
|
|
|
|
for i, moment in enumerate(blunders[:5], 1):
|
|
powers_str = ', '.join([self.format_power_with_model(p) for p in moment.powers_involved])
|
|
report_lines.extend([
|
|
f"#### {i}. {moment.phase} (Score: {moment.interest_score}/10)",
|
|
f"**Powers Involved:** {powers_str}",
|
|
"",
|
|
f"**Mistaken Strategy:** {moment.promise_agreement if moment.promise_agreement else 'N/A'}",
|
|
"",
|
|
f"**What Happened:** {moment.actual_action if moment.actual_action else 'N/A'}",
|
|
"",
|
|
f"**Impact:** {moment.impact if moment.impact else 'N/A'}",
|
|
"",
|
|
"**Diary Context:**",
|
|
""
|
|
])
|
|
|
|
# Add relevant diary entries
|
|
for power in moment.powers_involved:
|
|
if power in moment.diary_context:
|
|
power_with_model = self.format_power_with_model(power)
|
|
report_lines.append(f"_{power_with_model} Diary:_ {moment.diary_context[power]}")
|
|
report_lines.append("")
|
|
|
|
report_lines.append("")
|
|
|
|
# Write report
|
|
with open(output_path, 'w') as f:
|
|
f.write('\n'.join(report_lines))
|
|
|
|
logger.info(f"Report generated: {output_path}")
|
|
return output_path
|
|
|
|
def save_json_results(self, output_path: Optional[str] = None):
|
|
"""Save all moments as JSON for further analysis"""
|
|
# Generate unique filename with datetime if no path specified
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
if output_path is None:
|
|
# Create in the game_moments directory
|
|
game_moments_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "game_moments")
|
|
os.makedirs(game_moments_dir, exist_ok=True)
|
|
|
|
# Use results folder name in the file name
|
|
results_name = os.path.basename(os.path.normpath(str(self.results_folder)))
|
|
output_path = os.path.join(game_moments_dir, f"{results_name}_data_{timestamp}.json")
|
|
|
|
# Prepare the moments data
|
|
moments_data = []
|
|
for moment in self.moments:
|
|
moment_dict = asdict(moment)
|
|
# Remove raw data for cleaner JSON
|
|
moment_dict.pop('raw_messages', None)
|
|
moment_dict.pop('raw_orders', None)
|
|
# Keep diary context but limit size
|
|
if 'diary_context' in moment_dict:
|
|
for power, diary in moment_dict['diary_context'].items():
|
|
moment_dict['diary_context'][power] = diary # Include full diary content
|
|
moments_data.append(moment_dict)
|
|
|
|
# Create the final data structure with metadata
|
|
full_data = {
|
|
"metadata": {
|
|
"timestamp": timestamp,
|
|
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"source_folder": str(self.results_folder),
|
|
"analysis_model": self.model_name,
|
|
"total_moments": len(self.moments),
|
|
"moment_categories": {
|
|
"betrayals": len([m for m in self.moments if m.category == "BETRAYAL"]),
|
|
"collaborations": len([m for m in self.moments if m.category == "COLLABORATION"]),
|
|
"playing_both_sides": len([m for m in self.moments if m.category == "PLAYING_BOTH_SIDES"]),
|
|
"brilliant_strategies": len([m for m in self.moments if m.category == "BRILLIANT_STRATEGY"]),
|
|
"strategic_blunders": len([m for m in self.moments if m.category == "STRATEGIC_BLUNDER"])
|
|
},
|
|
"score_distribution": {
|
|
"scores_9_10": len([m for m in self.moments if m.interest_score >= 9]),
|
|
"scores_7_8": len([m for m in self.moments if 7 <= m.interest_score < 9]),
|
|
"scores_4_6": len([m for m in self.moments if 4 <= m.interest_score < 7]),
|
|
"scores_1_3": len([m for m in self.moments if m.interest_score < 4])
|
|
}
|
|
},
|
|
"power_models": self.power_to_model,
|
|
"invalid_moves_by_model": self.invalid_moves_by_model,
|
|
"lies_by_model": self.lies_by_model,
|
|
"moments": moments_data,
|
|
"lies": [asdict(lie) for lie in self.lies]
|
|
}
|
|
|
|
# Write to file
|
|
with open(output_path, 'w') as f:
|
|
json.dump(full_data, f, indent=2)
|
|
|
|
logger.info(f"JSON results saved: {output_path}")
|
|
return output_path
|
|
|
|
async def main():
|
|
parser = argparse.ArgumentParser(description="Analyze Diplomacy game for key strategic moments")
|
|
parser.add_argument("results_folder", help="Path to the results folder containing lmvsgame.json and overview.jsonl")
|
|
parser.add_argument("--model", default="openrouter-google/gemini-2.5-flash-preview",
|
|
help="Model to use for analysis")
|
|
parser.add_argument("--report", default=None,
|
|
help="Output path for markdown report (auto-generates timestamped name if not specified)")
|
|
parser.add_argument("--json", default=None,
|
|
help="Output path for JSON results (auto-generates timestamped name if not specified)")
|
|
parser.add_argument("--max-phases", type=int, default=None,
|
|
help="Maximum number of phases to analyze (useful for testing)")
|
|
parser.add_argument("--max-concurrent", type=int, default=5,
|
|
help="Maximum number of concurrent phase analyses (default: 5)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Ensure the game_moments directory exists
|
|
game_moments_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "game_moments")
|
|
os.makedirs(game_moments_dir, exist_ok=True)
|
|
|
|
# Extract game name from the results folder
|
|
results_folder_name = os.path.basename(os.path.normpath(args.results_folder))
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
# Create default report and JSON paths in the game_moments directory
|
|
if args.report is None:
|
|
args.report = os.path.join(game_moments_dir, f"{results_folder_name}_report_{timestamp}.md")
|
|
|
|
if args.json is None:
|
|
args.json = os.path.join(game_moments_dir, f"{results_folder_name}_data_{timestamp}.json")
|
|
|
|
analyzer = GameAnalyzer(args.results_folder, args.model)
|
|
|
|
try:
|
|
await analyzer.initialize()
|
|
await analyzer.analyze_game(max_phases=args.max_phases, max_concurrent=args.max_concurrent)
|
|
report_path = await analyzer.generate_report(args.report)
|
|
json_path = analyzer.save_json_results(args.json)
|
|
|
|
# Print summary
|
|
print(f"\nAnalysis Complete!")
|
|
print(f"Found {len(analyzer.moments)} key moments")
|
|
print(f"Report saved to: {report_path}")
|
|
print(f"JSON data saved to: {json_path}")
|
|
|
|
# Show score distribution
|
|
print("\nScore Distribution:")
|
|
print(f" Scores 9-10: {len([m for m in analyzer.moments if m.interest_score >= 9])}")
|
|
print(f" Scores 7-8: {len([m for m in analyzer.moments if 7 <= m.interest_score < 9])}")
|
|
print(f" Scores 4-6: {len([m for m in analyzer.moments if 4 <= m.interest_score < 7])}")
|
|
print(f" Scores 1-3: {len([m for m in analyzer.moments if m.interest_score < 4])}")
|
|
|
|
# Show top 3 moments
|
|
print("\nTop 3 Most Interesting Moments:")
|
|
for i, moment in enumerate(analyzer.moments[:3], 1):
|
|
powers_str = ', '.join([analyzer.format_power_with_model(p) for p in moment.powers_involved])
|
|
print(f"{i}. {moment.category} in {moment.phase} (Score: {moment.interest_score})")
|
|
print(f" Powers: {powers_str}")
|
|
print(f" Impact: {moment.impact[:100]}...")
|
|
print()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Analysis failed: {e}")
|
|
raise
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |