mirror of
https://github.com/GoodStartLabs/AI_Diplomacy.git
synced 2026-04-19 12:58:09 +00:00
1129 lines
No EOL
51 KiB
Python
1129 lines
No EOL
51 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Analyze Key Game Moments: Betrayals, Collaborations, and Playing Both Sides
|
|
LLM-Based Version - Uses language models instead of regex for promise/lie detection
|
|
|
|
This script analyzes Diplomacy game data to identify the most interesting strategic moments.
|
|
Enhanced with:
|
|
- LLM-based promise extraction and lie detection
|
|
- Two-stage analysis (broad detection then deep analysis)
|
|
- Complete game narrative generation
|
|
- More accurate intent analysis from diary entries
|
|
"""
|
|
|
|
import json
|
|
import asyncio
|
|
import argparse
|
|
import logging
|
|
import csv
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Any
|
|
from dataclasses import dataclass, asdict, field
|
|
from datetime import datetime
|
|
import os
|
|
from dotenv import load_dotenv
|
|
|
|
# Import the client from ai_diplomacy module
|
|
from ai_diplomacy.clients import load_model_client
|
|
|
|
load_dotenv()
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class GameMoment:
|
|
"""Represents a key moment in the game"""
|
|
phase: str
|
|
category: str # BETRAYAL, COLLABORATION, PLAYING_BOTH_SIDES, BRILLIANT_STRATEGY, STRATEGIC_BLUNDER
|
|
powers_involved: List[str]
|
|
promise_agreement: str
|
|
actual_action: str
|
|
impact: str
|
|
interest_score: float
|
|
raw_messages: List[Dict]
|
|
raw_orders: Dict
|
|
diary_context: Dict[str, str] # New field for diary entries
|
|
state_update_context: Dict[str, str] = None # New field for state updates
|
|
|
|
@dataclass
|
|
class Lie:
|
|
"""Represents a detected lie in diplomatic communications"""
|
|
phase: str
|
|
liar: str
|
|
recipient: str
|
|
promise: str
|
|
diary_intent: str
|
|
actual_action: str
|
|
intentional: bool
|
|
explanation: str
|
|
|
|
class GameAnalyzer:
|
|
"""Analyzes Diplomacy game data for key strategic moments using LLM"""
|
|
|
|
def __init__(self, results_folder: str, model_name: str = "openrouter-google/gemini-2.5-flash-preview"):
|
|
self.results_folder = Path(results_folder)
|
|
self.game_data_path = self.results_folder / "lmvsgame.json"
|
|
self.overview_path = self.results_folder / "overview.jsonl"
|
|
self.csv_path = self.results_folder / "llm_responses.csv"
|
|
self.model_name = model_name
|
|
self.client = None
|
|
self.game_data = None
|
|
self.power_to_model = None
|
|
self.moments = []
|
|
self.diary_entries = {} # phase -> power -> diary content
|
|
self.state_updates = {} # phase -> power -> state update content
|
|
self.invalid_moves_by_model = {} # Initialize attribute
|
|
self.lies = [] # Track detected lies
|
|
self.lies_by_model = {} # model -> {intentional: count, unintentional: count}
|
|
|
|
async def initialize(self):
|
|
"""Initialize the analyzer with game data and model client"""
|
|
# Load game data
|
|
with open(self.game_data_path, 'r') as f:
|
|
self.game_data = json.load(f)
|
|
|
|
# Load power-to-model mapping from overview.jsonl
|
|
with open(self.overview_path, 'r') as f:
|
|
lines = f.readlines()
|
|
# Second line contains the power-to-model mapping
|
|
if len(lines) >= 2:
|
|
self.power_to_model = json.loads(lines[1])
|
|
logger.info(f"Loaded power-to-model mapping: {self.power_to_model}")
|
|
else:
|
|
logger.warning("Could not find power-to-model mapping in overview.jsonl")
|
|
self.power_to_model = {}
|
|
|
|
# Load diary entries from CSV
|
|
self.diary_entries = self.parse_llm_responses_csv()
|
|
logger.info(f"Loaded diary entries for {len(self.diary_entries)} phases")
|
|
|
|
# Load state updates from CSV
|
|
self.state_updates = self.parse_state_updates_csv()
|
|
logger.info(f"Loaded state updates for {len(self.state_updates)} phases")
|
|
|
|
# Load invalid moves data from CSV
|
|
self.invalid_moves_by_model = self.parse_invalid_moves_from_csv()
|
|
logger.info(f"Loaded invalid moves for {len(self.invalid_moves_by_model)} models")
|
|
|
|
# Initialize model client
|
|
self.client = load_model_client(self.model_name)
|
|
logger.info(f"Initialized with model: {self.model_name}")
|
|
|
|
def parse_llm_responses_csv(self) -> Dict[str, Dict[str, str]]:
|
|
"""Parse the CSV file to extract diary entries by phase and power"""
|
|
diary_entries = {}
|
|
|
|
try:
|
|
import pandas as pd
|
|
# Use pandas for more robust CSV parsing
|
|
df = pd.read_csv(self.csv_path)
|
|
|
|
# Filter for negotiation diary entries
|
|
diary_df = df[df['response_type'] == 'negotiation_diary']
|
|
|
|
for _, row in diary_df.iterrows():
|
|
phase = row['phase']
|
|
power = row['power']
|
|
raw_response = str(row['raw_response']).strip()
|
|
|
|
if phase not in diary_entries:
|
|
diary_entries[phase] = {}
|
|
|
|
try:
|
|
# Try to parse as JSON first
|
|
response = json.loads(raw_response)
|
|
diary_content = f"Negotiation Summary: {response.get('negotiation_summary', 'N/A')}\n"
|
|
diary_content += f"Intent: {response.get('intent', 'N/A')}\n"
|
|
relationships = response.get('updated_relationships', {})
|
|
if isinstance(relationships, dict):
|
|
diary_content += f"Relationships: {relationships}"
|
|
else:
|
|
diary_content += f"Relationships: {relationships}"
|
|
diary_entries[phase][power] = diary_content
|
|
except (json.JSONDecodeError, TypeError):
|
|
# If JSON parsing fails, use a simplified version or skip
|
|
if raw_response and raw_response.lower() not in ['null', 'nan', 'none']:
|
|
diary_entries[phase][power] = f"Raw diary: {raw_response}"
|
|
|
|
logger.info(f"Successfully parsed {len(diary_entries)} phases with diary entries")
|
|
return diary_entries
|
|
|
|
except ImportError:
|
|
# Fallback to standard CSV if pandas not available
|
|
logger.info("Pandas not available, using standard CSV parsing")
|
|
import csv
|
|
|
|
with open(self.csv_path, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
try:
|
|
if row.get('response_type') == 'negotiation_diary':
|
|
phase = row.get('phase', '')
|
|
power = row.get('power', '')
|
|
|
|
if phase and power:
|
|
if phase not in diary_entries:
|
|
diary_entries[phase] = {}
|
|
|
|
raw_response = row.get('raw_response', '').strip()
|
|
|
|
try:
|
|
# Try to parse as JSON
|
|
response = json.loads(raw_response)
|
|
diary_content = f"Negotiation Summary: {response.get('negotiation_summary', 'N/A')}\n"
|
|
diary_content += f"Intent: {response.get('intent', 'N/A')}\n"
|
|
diary_content += f"Relationships: {response.get('updated_relationships', 'N/A')}"
|
|
diary_entries[phase][power] = diary_content
|
|
except (json.JSONDecodeError, TypeError):
|
|
if raw_response and raw_response != "null":
|
|
diary_entries[phase][power] = f"Raw diary: {raw_response}"
|
|
except Exception as e:
|
|
continue # Skip problematic rows
|
|
|
|
return diary_entries
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing CSV file: {e}")
|
|
return {}
|
|
|
|
def parse_state_updates_csv(self) -> Dict[str, Dict[str, str]]:
|
|
"""Parse the CSV file to extract state updates by phase and power"""
|
|
state_updates = {}
|
|
|
|
try:
|
|
import pandas as pd
|
|
# Use pandas for more robust CSV parsing
|
|
df = pd.read_csv(self.csv_path)
|
|
|
|
# Filter for state update entries
|
|
state_df = df[df['response_type'] == 'state_update']
|
|
|
|
for _, row in state_df.iterrows():
|
|
phase = row['phase']
|
|
power = row['power']
|
|
raw_response = str(row['raw_response']).strip()
|
|
|
|
if phase not in state_updates:
|
|
state_updates[phase] = {}
|
|
|
|
try:
|
|
# Try to parse as JSON first
|
|
response = json.loads(raw_response)
|
|
state_content = f"Reasoning: {response.get('reasoning', 'N/A')}\n"
|
|
state_content += f"Relationships: {response.get('relationships', {})}\n"
|
|
goals = response.get('goals', [])
|
|
if isinstance(goals, list):
|
|
state_content += f"Goals: {'; '.join(goals)}"
|
|
else:
|
|
state_content += f"Goals: {goals}"
|
|
state_updates[phase][power] = state_content
|
|
except (json.JSONDecodeError, TypeError):
|
|
# If JSON parsing fails, use a simplified version or skip
|
|
if raw_response and raw_response.lower() not in ['null', 'nan', 'none']:
|
|
state_updates[phase][power] = f"Raw state update: {raw_response}"
|
|
|
|
logger.info(f"Successfully parsed {len(state_updates)} phases with state updates")
|
|
return state_updates
|
|
|
|
except ImportError:
|
|
# Fallback to standard CSV if pandas not available
|
|
logger.info("Pandas not available, using standard CSV parsing for state updates")
|
|
import csv
|
|
|
|
with open(self.csv_path, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
try:
|
|
if row.get('response_type') == 'state_update':
|
|
phase = row.get('phase', '')
|
|
power = row.get('power', '')
|
|
|
|
if phase and power:
|
|
if phase not in state_updates:
|
|
state_updates[phase] = {}
|
|
|
|
raw_response = row.get('raw_response', '').strip()
|
|
|
|
try:
|
|
# Try to parse as JSON
|
|
response = json.loads(raw_response)
|
|
state_content = f"Reasoning: {response.get('reasoning', 'N/A')}\n"
|
|
state_content += f"Relationships: {response.get('relationships', {})}\n"
|
|
goals = response.get('goals', [])
|
|
if isinstance(goals, list):
|
|
state_content += f"Goals: {'; '.join(goals)}"
|
|
else:
|
|
state_content += f"Goals: {goals}"
|
|
state_updates[phase][power] = state_content
|
|
except (json.JSONDecodeError, TypeError):
|
|
if raw_response and raw_response != "null":
|
|
state_updates[phase][power] = f"Raw state update: {raw_response}"
|
|
except Exception as e:
|
|
continue # Skip problematic rows
|
|
|
|
return state_updates
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing state updates from CSV file: {e}")
|
|
return {}
|
|
|
|
def parse_invalid_moves_from_csv(self) -> Dict[str, int]:
|
|
"""Parse the CSV file to count invalid moves by model"""
|
|
invalid_moves_by_model = {}
|
|
|
|
try:
|
|
import pandas as pd
|
|
# Use pandas for more robust CSV parsing
|
|
df = pd.read_csv(self.csv_path)
|
|
|
|
# Look for failures in the success column
|
|
failure_df = df[df['success'].str.contains('Failure: Invalid LLM Moves', na=False)]
|
|
|
|
for _, row in failure_df.iterrows():
|
|
model = row['model']
|
|
success_text = str(row['success'])
|
|
|
|
# Extract the number from "Failure: Invalid LLM Moves (N):"
|
|
import re
|
|
match = re.search(r'Invalid LLM Moves \((\d+)\)', success_text)
|
|
if match:
|
|
invalid_count = int(match.group(1))
|
|
if model not in invalid_moves_by_model:
|
|
invalid_moves_by_model[model] = 0
|
|
invalid_moves_by_model[model] += invalid_count
|
|
|
|
logger.info(f"Successfully parsed invalid moves for {len(invalid_moves_by_model)} models")
|
|
return invalid_moves_by_model
|
|
|
|
except ImportError:
|
|
# Fallback to standard CSV if pandas not available
|
|
logger.info("Pandas not available, using standard CSV parsing for invalid moves")
|
|
import csv
|
|
import re
|
|
|
|
with open(self.csv_path, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
try:
|
|
success_text = row.get('success', '')
|
|
if 'Failure: Invalid LLM Moves' in success_text:
|
|
model = row.get('model', '')
|
|
match = re.search(r'Invalid LLM Moves \((\d+)\)', success_text)
|
|
if match and model:
|
|
invalid_count = int(match.group(1))
|
|
if model not in invalid_moves_by_model:
|
|
invalid_moves_by_model[model] = 0
|
|
invalid_moves_by_model[model] += invalid_count
|
|
except Exception as e:
|
|
continue # Skip problematic rows
|
|
|
|
return invalid_moves_by_model
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing invalid moves from CSV file: {e}")
|
|
return {}
|
|
|
|
def extract_turn_data(self, phase_data: Dict) -> Dict:
|
|
"""Extract relevant data from a single turn/phase"""
|
|
phase_name = phase_data.get("name", "")
|
|
|
|
# Get diary entries for this phase
|
|
phase_diaries = self.diary_entries.get(phase_name, {})
|
|
|
|
# Get state updates for this phase
|
|
phase_state_updates = self.state_updates.get(phase_name, {})
|
|
|
|
return {
|
|
"phase": phase_name,
|
|
"messages": phase_data.get("messages", []),
|
|
"orders": phase_data.get("orders", {}),
|
|
"summary": phase_data.get("summary", ""),
|
|
"statistical_summary": phase_data.get("statistical_summary", {}),
|
|
"diaries": phase_diaries,
|
|
"state_updates": phase_state_updates
|
|
}
|
|
|
|
def create_analysis_prompt(self, turn_data: Dict) -> str:
|
|
"""Create the analysis prompt for a single turn"""
|
|
# Format messages for analysis
|
|
formatted_messages = []
|
|
for msg in turn_data.get("messages", []):
|
|
sender = msg.get('sender', 'Unknown')
|
|
sender_model = self.power_to_model.get(sender, '')
|
|
sender_str = f"{sender} ({sender_model})" if sender_model else sender
|
|
|
|
recipient = msg.get('recipient', 'Unknown')
|
|
recipient_model = self.power_to_model.get(recipient, '')
|
|
recipient_str = f"{recipient} ({recipient_model})" if recipient_model else recipient
|
|
|
|
formatted_messages.append(
|
|
f"{sender_str} to {recipient_str}: {msg.get('message', '')}"
|
|
)
|
|
|
|
# Format orders for analysis
|
|
formatted_orders = []
|
|
for power, power_orders in turn_data.get("orders", {}).items():
|
|
power_model = self.power_to_model.get(power, '')
|
|
power_str = f"{power} ({power_model})" if power_model else power
|
|
formatted_orders.append(f"{power_str}: {power_orders}")
|
|
|
|
# Format diary entries
|
|
formatted_diaries = []
|
|
for power, diary in turn_data.get("diaries", {}).items():
|
|
power_model = self.power_to_model.get(power, '')
|
|
power_str = f"{power} ({power_model})" if power_model else power
|
|
formatted_diaries.append(f"{power_str} DIARY:\n{diary}")
|
|
|
|
# Format state updates
|
|
formatted_state_updates = []
|
|
for power, state_update in turn_data.get("state_updates", {}).items():
|
|
power_model = self.power_to_model.get(power, '')
|
|
power_str = f"{power} ({power_model})" if power_model else power
|
|
formatted_state_updates.append(f"{power_str} STATE UPDATE:\n{state_update}")
|
|
|
|
prompt = f"""You are analyzing diplomatic negotiations and subsequent military orders from a Diplomacy game. Your task is to identify ONLY the most significant strategic moments.
|
|
|
|
CRITICAL: 90% of game turns contain NO moments worth reporting. Only identify moments that meet these strict criteria:
|
|
|
|
CATEGORIES:
|
|
1. BETRAYAL: Explicit promise broken that directly causes supply center loss
|
|
2. COLLABORATION: Successful coordination that captures/defends supply centers
|
|
3. PLAYING_BOTH_SIDES: Conflicting promises that manipulate the game's outcome
|
|
4. BRILLIANT_STRATEGY: Moves that gain 2+ centers or save from elimination
|
|
5. STRATEGIC_BLUNDER: Errors that lose 2+ centers or enable enemy victory
|
|
|
|
STRICT SCORING RUBRIC:
|
|
- Scores 1-6: DO NOT REPORT THESE. Routine diplomacy, expected moves.
|
|
- Score 7: Supply center changes hands due to this specific action
|
|
- Score 8: Multiple centers affected or major power dynamic shift
|
|
- Score 9: Completely alters the game trajectory (power eliminated, alliance system collapses)
|
|
- Score 10: Once-per-game brilliance or catastrophe that determines the winner
|
|
|
|
REQUIREMENTS FOR ANY REPORTED MOMENT:
|
|
✓ Supply centers must change hands as a direct result
|
|
✓ The action must be surprising given prior context
|
|
✓ The impact must be immediately measurable
|
|
✓ This must be a top-20 moment in the entire game
|
|
|
|
Examples of what NOT to report:
|
|
- Routine support orders that work as planned
|
|
- Minor position improvements
|
|
- Vague diplomatic promises
|
|
- Failed attacks with no consequences
|
|
- Defensive holds that maintain status quo
|
|
|
|
For this turn ({turn_data.get('phase', '')}), analyze:
|
|
|
|
PRIVATE DIARY ENTRIES (Powers' internal thoughts):
|
|
{chr(10).join(formatted_diaries) if formatted_diaries else 'No diary entries available'}
|
|
|
|
MESSAGES:
|
|
{chr(10).join(formatted_messages) if formatted_messages else 'No messages this turn'}
|
|
|
|
ORDERS:
|
|
{chr(10).join(formatted_orders) if formatted_orders else 'No orders this turn'}
|
|
|
|
TURN SUMMARY:
|
|
{turn_data.get('summary', 'No summary available')}
|
|
|
|
STATE UPDATES (Powers' reactions after seeing results):
|
|
{chr(10).join(formatted_state_updates) if formatted_state_updates else 'No state updates available'}
|
|
|
|
Identify ALL instances that fit the five categories. For each instance provide:
|
|
{{
|
|
"category": "BETRAYAL" or "COLLABORATION" or "PLAYING_BOTH_SIDES" or "BRILLIANT_STRATEGY" or "STRATEGIC_BLUNDER",
|
|
"powers_involved": ["POWER1", "POWER2", ...],
|
|
"promise_agreement": "What was promised/agreed/intended (or strategy attempted)",
|
|
"actual_action": "What actually happened",
|
|
"impact": "Strategic impact on the game",
|
|
"interest_score": 6.5 // 1-10 scale, be STRICT with high scores
|
|
}}
|
|
|
|
Use the diary entries to verify:
|
|
- Whether actions align with stated intentions
|
|
- Hidden motivations behind diplomatic moves
|
|
- Contradictions between public promises and private plans
|
|
- Strategic planning and its execution
|
|
|
|
Return your response as a JSON array of detected moments. If no relevant moments are found, return an empty array [].
|
|
|
|
Focus on:
|
|
- Comparing diary intentions vs actual orders
|
|
- Explicit promises vs actual orders
|
|
- Coordinated attacks or defenses
|
|
- DMZ violations
|
|
- Support promises kept or broken
|
|
- Conflicting negotiations with different powers
|
|
- Clever strategic positioning
|
|
- Missed strategic opportunities
|
|
- Tactical errors that cost supply centers
|
|
|
|
PROVIDE YOUR RESPONSE BELOW:"""
|
|
return prompt
|
|
|
|
async def analyze_turn(self, phase_data: Dict) -> List[Dict]:
|
|
"""Analyze a single turn for key moments"""
|
|
turn_data = self.extract_turn_data(phase_data)
|
|
|
|
# Skip if no meaningful data
|
|
if not turn_data["messages"] and not turn_data["orders"]:
|
|
return []
|
|
|
|
prompt = self.create_analysis_prompt(turn_data)
|
|
|
|
try:
|
|
response = await self.client.generate_response(prompt)
|
|
|
|
# Parse JSON response
|
|
# Handle potential code blocks or direct JSON
|
|
if "```json" in response:
|
|
response = response.split("```json")[1].split("```")[0]
|
|
elif "```" in response:
|
|
response = response.split("```")[1].split("```")[0]
|
|
|
|
detected_moments = json.loads(response)
|
|
|
|
# Enrich with raw data
|
|
moments = []
|
|
for moment in detected_moments:
|
|
game_moment = GameMoment(
|
|
phase=turn_data["phase"],
|
|
category=moment.get("category", ""),
|
|
powers_involved=moment.get("powers_involved", []),
|
|
promise_agreement=moment.get("promise_agreement", ""),
|
|
actual_action=moment.get("actual_action", ""),
|
|
impact=moment.get("impact", ""),
|
|
interest_score=float(moment.get("interest_score", 5)),
|
|
raw_messages=turn_data["messages"],
|
|
raw_orders=turn_data["orders"],
|
|
diary_context=turn_data["diaries"],
|
|
state_update_context=turn_data["state_updates"]
|
|
)
|
|
moments.append(game_moment)
|
|
logger.info(f"Detected {game_moment.category} in {game_moment.phase} "
|
|
f"(score: {game_moment.interest_score})")
|
|
|
|
return moments
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing turn {turn_data.get('phase', '')}: {e}")
|
|
return []
|
|
|
|
async def detect_lies_in_phase(self, phase_data: Dict) -> List[Lie]:
|
|
"""Detect lies by using LLM to analyze messages, diary entries, and actual orders"""
|
|
phase_name = phase_data.get("name", "")
|
|
messages = phase_data.get("messages", [])
|
|
orders = phase_data.get("orders", {})
|
|
diaries = self.diary_entries.get(phase_name, {})
|
|
|
|
detected_lies = []
|
|
|
|
# Group messages by sender
|
|
messages_by_sender = {}
|
|
for msg in messages:
|
|
sender = msg.get('sender', '')
|
|
if sender not in messages_by_sender:
|
|
messages_by_sender[sender] = []
|
|
messages_by_sender[sender].append(msg)
|
|
|
|
# Analyze each power's messages against their diary and orders
|
|
for sender, sent_messages in messages_by_sender.items():
|
|
sender_diary = diaries.get(sender, '')
|
|
sender_orders = orders.get(sender, [])
|
|
|
|
# Use LLM to analyze promises and lies for this sender
|
|
lie_analysis = await self.analyze_sender_promises(
|
|
sender, sent_messages, sender_orders, sender_diary, phase_name
|
|
)
|
|
detected_lies.extend(lie_analysis)
|
|
|
|
return detected_lies
|
|
|
|
async def analyze_sender_promises(self, sender: str, messages: List[Dict],
|
|
actual_orders: List[str], diary: str,
|
|
phase: str) -> List[Lie]:
|
|
"""Use LLM to analyze a sender's messages for promises and check if they were kept"""
|
|
|
|
# Skip if no messages to analyze
|
|
if not messages:
|
|
return []
|
|
|
|
# Create prompt for LLM to analyze promises and lies
|
|
prompt = self.create_lie_detection_prompt(sender, messages, actual_orders, diary, phase)
|
|
|
|
try:
|
|
response = await self.client.generate_response(prompt)
|
|
|
|
# Parse JSON response
|
|
if "```json" in response:
|
|
response = response.split("```json")[1].split("```")[0]
|
|
elif "```" in response:
|
|
response = response.split("```")[1].split("```")[0]
|
|
|
|
detected_lies_data = json.loads(response)
|
|
|
|
# Convert to Lie objects
|
|
lies = []
|
|
for lie_data in detected_lies_data:
|
|
lie = Lie(
|
|
phase=phase,
|
|
liar=sender,
|
|
recipient=lie_data.get("recipient", ""),
|
|
promise=lie_data.get("promise", ""),
|
|
diary_intent=lie_data.get("diary_intent", ""),
|
|
actual_action=lie_data.get("actual_action", ""),
|
|
intentional=lie_data.get("is_intentional", False),
|
|
explanation="Intentional deception" if lie_data.get("is_intentional", False) else "Possible misunderstanding or changed circumstances"
|
|
)
|
|
lies.append(lie)
|
|
|
|
return lies
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing promises for {sender} in {phase}: {e}")
|
|
return []
|
|
|
|
def create_lie_detection_prompt(self, sender: str, messages: List[Dict],
|
|
actual_orders: List[str], diary: str, phase: str) -> str:
|
|
"""Create a prompt for LLM to detect lies"""
|
|
|
|
# Format messages for the prompt
|
|
messages_text = ""
|
|
for msg in messages:
|
|
recipient = msg.get('recipient', '')
|
|
text = msg.get('message', '')
|
|
messages_text += f"\nTo {recipient}: {text}\n"
|
|
|
|
prompt = f"""Analyze these diplomatic messages from {sender} in phase {phase} to identify any lies or broken promises.
|
|
|
|
MESSAGES SENT BY {sender}:
|
|
{messages_text}
|
|
|
|
ACTUAL ORDERS EXECUTED BY {sender}:
|
|
{', '.join(actual_orders) if actual_orders else 'No orders'}
|
|
|
|
DIARY ENTRY (showing {sender}'s private thoughts):
|
|
{diary if diary else 'No diary entry'}
|
|
|
|
INSTRUCTIONS:
|
|
1. Identify any explicit promises made in the messages. A promise is:
|
|
- A commitment to take a specific action (e.g., "I will support your move to Munich")
|
|
- An agreement about orders (e.g., "I'll move my fleet to the English Channel")
|
|
- A commitment NOT to do something (e.g., "I won't attack Venice")
|
|
- An agreement about territory (e.g., "Norway will remain neutral")
|
|
|
|
2. For each promise found:
|
|
- Check if it was kept by comparing to the actual orders
|
|
- Determine if any broken promise was intentional (planned deception visible in diary) or unintentional
|
|
- Only count it as a lie if the promise was clear and specific
|
|
|
|
3. Ignore:
|
|
- Vague statements or general intentions
|
|
- Conditional statements ("I might...", "I'm considering...")
|
|
- Discussions of hypothetical scenarios
|
|
- General diplomatic pleasantries
|
|
|
|
Return a JSON array of detected lies. For each lie include:
|
|
{{
|
|
"recipient": "POWER_NAME",
|
|
"promise": "The specific promise made (quote or paraphrase)",
|
|
"diary_intent": "Relevant diary entry showing intent (if any)",
|
|
"actual_action": "What actually happened instead",
|
|
"is_intentional": true/false (true if diary shows planned deception)
|
|
}}
|
|
|
|
If no lies are detected, return an empty array [].
|
|
|
|
Return ONLY the JSON array, no other text.
|
|
|
|
PROVIDE YOUR RESPONSE BELOW:"""
|
|
return prompt
|
|
|
|
async def analyze_game(self, max_phases: Optional[int] = None, max_concurrent: int = 5):
|
|
"""Analyze the entire game for key moments with concurrent processing
|
|
|
|
Args:
|
|
max_phases: Maximum number of phases to analyze (None = all)
|
|
max_concurrent: Maximum number of concurrent phase analyses
|
|
"""
|
|
phases = self.game_data.get("phases", [])
|
|
|
|
if max_phases is not None:
|
|
phases = phases[:max_phases]
|
|
logger.info(f"Analyzing first {len(phases)} phases (out of {len(self.game_data.get('phases', []))} total)...")
|
|
else:
|
|
logger.info(f"Analyzing {len(phases)} phases...")
|
|
|
|
# Process phases in batches to avoid overwhelming the API
|
|
all_moments = []
|
|
|
|
for i in range(0, len(phases), max_concurrent):
|
|
batch = phases[i:i + max_concurrent]
|
|
batch_start = i + 1
|
|
batch_end = min(i + max_concurrent, len(phases))
|
|
|
|
logger.info(f"Processing batch {batch_start}-{batch_end} of {len(phases)} phases...")
|
|
|
|
# Create tasks for concurrent processing
|
|
tasks = []
|
|
for j, phase_data in enumerate(batch):
|
|
phase_name = phase_data.get("name", f"Phase {i+j}")
|
|
logger.info(f"Starting analysis of phase {phase_name}")
|
|
task = self.analyze_turn(phase_data)
|
|
tasks.append(task)
|
|
|
|
# Wait for all tasks in this batch to complete
|
|
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Process results and handle any exceptions
|
|
for j, result in enumerate(batch_results):
|
|
if isinstance(result, Exception):
|
|
phase_name = batch[j].get("name", f"Phase {i+j}")
|
|
logger.error(f"Error analyzing phase {phase_name}: {result}")
|
|
else:
|
|
all_moments.extend(result)
|
|
|
|
# Small delay between batches to be respectful to the API
|
|
if i + max_concurrent < len(phases):
|
|
logger.info(f"Batch complete. Waiting 2 seconds before next batch...")
|
|
await asyncio.sleep(2)
|
|
|
|
self.moments = all_moments
|
|
|
|
# Analyze lies separately
|
|
logger.info("Analyzing diplomatic lies...")
|
|
for phase_data in phases:
|
|
phase_lies = await self.detect_lies_in_phase(phase_data)
|
|
self.lies.extend(phase_lies)
|
|
|
|
# Count lies by model
|
|
for lie in self.lies:
|
|
liar_model = self.power_to_model.get(lie.liar, 'Unknown')
|
|
if liar_model not in self.lies_by_model:
|
|
self.lies_by_model[liar_model] = {'intentional': 0, 'unintentional': 0}
|
|
|
|
if lie.intentional:
|
|
self.lies_by_model[liar_model]['intentional'] += 1
|
|
else:
|
|
self.lies_by_model[liar_model]['unintentional'] += 1
|
|
|
|
# Sort moments by interest score
|
|
self.moments.sort(key=lambda m: m.interest_score, reverse=True)
|
|
|
|
logger.info(f"Analysis complete. Found {len(self.moments)} key moments and {len(self.lies)} lies.")
|
|
|
|
def format_power_with_model(self, power: str) -> str:
|
|
"""Format power name with model in parentheses"""
|
|
model = self.power_to_model.get(power, '')
|
|
return f"{power} ({model})" if model else power
|
|
|
|
def phase_sort_key(self, phase_name):
|
|
"""Create a sortable key for diplomacy phases like 'S1901M', 'F1901M', etc."""
|
|
# Extract season, year, and type
|
|
if not phase_name or len(phase_name) < 6:
|
|
return (0, 0, "")
|
|
|
|
try:
|
|
season = phase_name[0] # S, F, W
|
|
year = int(phase_name[1:5]) if phase_name[1:5].isdigit() else 0 # 1901, 1902, etc.
|
|
phase_type = phase_name[5:] # M, A, R
|
|
|
|
# Order: Spring (S) < Fall (F) < Winter (W)
|
|
season_order = {"S": 1, "F": 2, "W": 3}.get(season, 0)
|
|
|
|
return (year, season_order, phase_type)
|
|
except Exception:
|
|
return (0, 0, "")
|
|
|
|
async def generate_narrative(self) -> str:
|
|
"""Generate a narrative story of the game using phase summaries and top moments"""
|
|
# Collect all phase summaries
|
|
phase_summaries = []
|
|
phases_with_summaries = []
|
|
|
|
for phase in self.game_data.get("phases", []):
|
|
phase_name = phase.get("name", "")
|
|
summary = phase.get("summary", "").strip()
|
|
|
|
if summary:
|
|
phases_with_summaries.append(phase_name)
|
|
phase_summaries.append(f"{phase_name}: {summary}")
|
|
|
|
# Identify key moments by category
|
|
betrayals = [m for m in self.moments if m.category == "BETRAYAL" and m.interest_score >= 8][:5]
|
|
collaborations = [m for m in self.moments if m.category == "COLLABORATION" and m.interest_score >= 8][:5]
|
|
playing_both_sides = [m for m in self.moments if m.category == "PLAYING_BOTH_SIDES" and m.interest_score >= 8][:5]
|
|
brilliant_strategies = [m for m in self.moments if m.category == "BRILLIANT_STRATEGY" and m.interest_score >= 8][:5]
|
|
strategic_blunders = [m for m in self.moments if m.category == "STRATEGIC_BLUNDER" and m.interest_score >= 8][:5]
|
|
|
|
# Find the winner
|
|
final_phase = self.game_data.get("phases", [])[-1] if self.game_data.get("phases") else None
|
|
winner = None
|
|
if final_phase:
|
|
final_summary = final_phase.get("summary", "")
|
|
if "solo victory" in final_summary.lower() or "wins" in final_summary.lower():
|
|
# Extract winner from summary
|
|
for power in ["AUSTRIA", "ENGLAND", "FRANCE", "GERMANY", "ITALY", "RUSSIA", "TURKEY"]:
|
|
if power in final_summary:
|
|
winner = power
|
|
break
|
|
|
|
# Create the narrative prompt
|
|
narrative_prompt = f"""Generate a dramatic narrative of this Diplomacy game that covers the ENTIRE game from beginning to end. You should not spend too much time on any one phase. You should be telling stories across the whole game, focusing on the most important moments. Don't repeat yourself. Really think about the art of storytelling here and how to make this engaging, highlighting both the power and the model itself, which is more interesting throughout. Make sure you call back to relationships that used to exist and how things change throughout, and culminate in a satisfying ending.
|
|
|
|
POWER MODELS:
|
|
{chr(10).join([f"- {power}: {model}" for power, model in self.power_to_model.items()])}
|
|
|
|
PHASE SUMMARIES (in chronological order):
|
|
{chr(10).join(phase_summaries[:10])} # First few phases
|
|
...
|
|
{chr(10).join(phase_summaries[-10:])} # Last few phases
|
|
|
|
KEY BETRAYALS:
|
|
{chr(10).join([f"- {m.phase}: {', '.join(m.powers_involved)} - {m.promise_agreement}" for m in betrayals[:3]])}
|
|
|
|
KEY COLLABORATIONS:
|
|
{chr(10).join([f"- {m.phase}: {', '.join(m.powers_involved)} - {m.promise_agreement}" for m in collaborations[:3]])}
|
|
|
|
KEY INSTANCES OF PLAYING BOTH SIDES:
|
|
{chr(10).join([f"- {m.phase}: {', '.join(m.powers_involved)} - {m.promise_agreement}" for m in playing_both_sides[:3]])}
|
|
|
|
BRILLIANT STRATEGIES:
|
|
{chr(10).join([f"- {m.phase}: {', '.join(m.powers_involved)} - {m.promise_agreement}" for m in brilliant_strategies[:3]])}
|
|
|
|
STRATEGIC BLUNDERS:
|
|
{chr(10).join([f"- {m.phase}: {', '.join(m.powers_involved)} - {m.promise_agreement}" for m in strategic_blunders[:3]])}
|
|
|
|
FINAL OUTCOME: {winner + " achieves solo victory" if winner else "Draw or ongoing"}
|
|
|
|
Write a compelling narrative that:
|
|
1. Starts with the opening moves and initial diplomatic landscape
|
|
2. Covers the ENTIRE game progression, not just the beginning
|
|
3. Highlights key turning points and dramatic moments throughout
|
|
4. Shows how alliances formed, shifted, and broke over time
|
|
5. Explains the strategic evolution of the game
|
|
6. Builds to the dramatic conclusion
|
|
7. Names each power with their model in parentheses (e.g., "France (claude-opus-4-20250514)")
|
|
8. Is written as a single flowing paragraph
|
|
9. Captures the drama and tension of the entire game
|
|
10. Is well formatted with great spacing that makes it easy to read and breaks phases of the game into paragraphs
|
|
11. The whole thing should be relatively concise
|
|
|
|
PROVIDE YOUR NARRATIVE BELOW:"""
|
|
|
|
try:
|
|
narrative_response = await self.client.generate_response(narrative_prompt)
|
|
return narrative_response.strip()
|
|
except Exception as e:
|
|
logger.error(f"Error generating narrative: {e}")
|
|
# Fallback narrative
|
|
return f"The game began in Spring 1901 with seven powers vying for control of Europe. {winner + ' ultimately achieved a solo victory.' if winner else 'The game concluded without a clear victor.'}"
|
|
|
|
async def generate_report(self, output_path: Optional[str] = None) -> str:
|
|
"""Generate the full analysis report matching the exact format of existing reports"""
|
|
# Generate output path if not provided
|
|
if not output_path:
|
|
# Save directly in the results folder
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
output_path = self.results_folder / f"game_moments_report_{timestamp}.md"
|
|
|
|
# Ensure the parent directory exists
|
|
output_path = Path(output_path)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Count moments by category
|
|
category_counts = {
|
|
"Betrayals": len([m for m in self.moments if m.category == "BETRAYAL"]),
|
|
"Collaborations": len([m for m in self.moments if m.category == "COLLABORATION"]),
|
|
"Playing Both Sides": len([m for m in self.moments if m.category == "PLAYING_BOTH_SIDES"]),
|
|
"Brilliant Strategies": len([m for m in self.moments if m.category == "BRILLIANT_STRATEGY"]),
|
|
"Strategic Blunders": len([m for m in self.moments if m.category == "STRATEGIC_BLUNDER"])
|
|
}
|
|
|
|
# Score distribution
|
|
score_dist = {
|
|
"9-10": len([m for m in self.moments if m.interest_score >= 9]),
|
|
"7-8": len([m for m in self.moments if 7 <= m.interest_score < 9]),
|
|
"4-6": len([m for m in self.moments if 4 <= m.interest_score < 7]),
|
|
"1-3": len([m for m in self.moments if m.interest_score < 4])
|
|
}
|
|
|
|
# Generate narrative
|
|
narrative = await self.generate_narrative()
|
|
|
|
# Start building the report
|
|
report = f"""# Diplomacy Game Analysis: Key Moments
|
|
Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
|
|
Game: {self.game_data_path}
|
|
|
|
## Game Narrative
|
|
|
|
{narrative}
|
|
|
|
---
|
|
|
|
## Summary
|
|
- Total moments analyzed: {len(self.moments)}
|
|
- Betrayals: {category_counts['Betrayals']}
|
|
- Collaborations: {category_counts['Collaborations']}
|
|
- Playing Both Sides: {category_counts['Playing Both Sides']}
|
|
- Brilliant Strategies: {category_counts['Brilliant Strategies']}
|
|
- Strategic Blunders: {category_counts['Strategic Blunders']}
|
|
|
|
## Score Distribution
|
|
- Scores 9-10: {score_dist['9-10']}
|
|
- Scores 7-8: {score_dist['7-8']}
|
|
- Scores 4-6: {score_dist['4-6']}
|
|
- Scores 1-3: {score_dist['1-3']}
|
|
|
|
## Power Models
|
|
|
|
"""
|
|
# Add power models
|
|
for power in sorted(self.power_to_model.keys()):
|
|
model = self.power_to_model[power]
|
|
report += f"- **{power}**: {model}\n"
|
|
|
|
# Add invalid moves by model
|
|
report += "\n## Invalid Moves by Model\n\n"
|
|
sorted_invalid = sorted(self.invalid_moves_by_model.items(), key=lambda x: x[1], reverse=True)
|
|
for model, count in sorted_invalid:
|
|
report += f"- **{model}**: {count} invalid moves\n"
|
|
|
|
# Add lies analysis
|
|
report += "\n## Lies Analysis\n\n### Lies by Model\n\n"
|
|
sorted_lies = sorted(self.lies_by_model.items(),
|
|
key=lambda x: x[1]['intentional'] + x[1]['unintentional'],
|
|
reverse=True)
|
|
for model, counts in sorted_lies:
|
|
total = counts['intentional'] + counts['unintentional']
|
|
report += f"- **{model}**: {total} total lies ({counts['intentional']} intentional, {counts['unintentional']} unintentional)\n"
|
|
|
|
# Add notable lies (first 5)
|
|
report += "\n### Notable Lies\n"
|
|
for i, lie in enumerate(self.lies[:5], 1):
|
|
report += f"\n#### {i}. {lie.phase} - {'Intentional Deception' if lie.intentional else 'Unintentional'}\n"
|
|
report += f"**{self.format_power_with_model(lie.liar)}** to **{self.format_power_with_model(lie.recipient)}**\n\n"
|
|
report += f"**Promise:** {lie.promise}\n\n"
|
|
report += f"**Diary Intent:** {lie.diary_intent}\n\n"
|
|
report += f"**Actual Action:** {lie.actual_action}\n"
|
|
|
|
# Add key strategic moments by category
|
|
report += "\n\n## Key Strategic Moments by Category\n"
|
|
|
|
categories = [
|
|
("Betrayals", "BETRAYAL", "When powers explicitly promised one action but took a contradictory action"),
|
|
("Collaborations", "COLLABORATION", "When powers successfully coordinated as agreed"),
|
|
("Playing Both Sides", "PLAYING_BOTH_SIDES", "When a power made conflicting promises to different parties"),
|
|
("Brilliant Strategies", "BRILLIANT_STRATEGY", "Exceptionally well-executed strategic maneuvers"),
|
|
("Strategic Blunders", "STRATEGIC_BLUNDER", "Major strategic mistakes that cost supply centers or position")
|
|
]
|
|
|
|
for category_name, category_code, description in categories:
|
|
report += f"\n### {category_name}\n_{description}_\n"
|
|
|
|
# Get top 5 moments for this category
|
|
category_moments = [m for m in self.moments if m.category == category_code]
|
|
category_moments.sort(key=lambda m: m.interest_score, reverse=True)
|
|
|
|
for i, moment in enumerate(category_moments[:5], 1):
|
|
report += f"\n#### {i}. {moment.phase} (Score: {moment.interest_score}/10)\n"
|
|
report += f"**Powers Involved:** {', '.join([self.format_power_with_model(p) for p in moment.powers_involved])}\n\n"
|
|
report += f"**Promise:** {moment.promise_agreement}\n\n"
|
|
report += f"**Actual Action:** {moment.actual_action}\n\n"
|
|
report += f"**Impact:** {moment.impact}\n\n"
|
|
|
|
# Add diary context
|
|
report += "**Diary Context:**\n\n"
|
|
for power in moment.powers_involved:
|
|
if power in moment.diary_context:
|
|
report += f"_{self.format_power_with_model(power)} Diary:_ {moment.diary_context[power]}\n\n"
|
|
|
|
# Add state update context
|
|
if moment.state_update_context:
|
|
report += "**State Update Context (Post-Action Reflections):**\n\n"
|
|
for power in moment.powers_involved:
|
|
if power in moment.state_update_context:
|
|
report += f"_{self.format_power_with_model(power)} State Update:_ {moment.state_update_context[power]}\n\n"
|
|
|
|
# Write to file
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
f.write(report)
|
|
|
|
logger.info(f"Report generated: {output_path}")
|
|
return str(output_path)
|
|
|
|
def save_json_results(self, output_path: Optional[str] = None) -> str:
|
|
"""Save all moments and lies as JSON in a unified format that works for both analysis and animation"""
|
|
# Generate output path if not provided
|
|
if not output_path:
|
|
# Save directly in the results folder as moments.json for direct use
|
|
output_path = self.results_folder / "moments.json"
|
|
|
|
output_path = Path(output_path)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Calculate category counts
|
|
category_counts = {
|
|
"betrayals": len([m for m in self.moments if m.category == "BETRAYAL"]),
|
|
"collaborations": len([m for m in self.moments if m.category == "COLLABORATION"]),
|
|
"playing_both_sides": len([m for m in self.moments if m.category == "PLAYING_BOTH_SIDES"]),
|
|
"brilliant_strategies": len([m for m in self.moments if m.category == "BRILLIANT_STRATEGY"]),
|
|
"strategic_blunders": len([m for m in self.moments if m.category == "STRATEGIC_BLUNDER"])
|
|
}
|
|
|
|
# Calculate score distribution
|
|
score_dist = {
|
|
"scores_9_10": len([m for m in self.moments if m.interest_score >= 9]),
|
|
"scores_7_8": len([m for m in self.moments if 7 <= m.interest_score < 9]),
|
|
"scores_4_6": len([m for m in self.moments if 4 <= m.interest_score < 7]),
|
|
"scores_1_3": len([m for m in self.moments if m.interest_score < 4])
|
|
}
|
|
|
|
# Prepare unified data structure that includes all information
|
|
unified_data = {
|
|
# Animation-compatible metadata
|
|
"metadata": {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"source_folder": str(self.results_folder),
|
|
"analysis_model": self.model_name,
|
|
"total_moments": len(self.moments),
|
|
"moment_categories": category_counts,
|
|
"score_distribution": score_dist,
|
|
# Additional comprehensive metadata
|
|
"game_results_folder": str(self.results_folder),
|
|
"analysis_timestamp": datetime.now().isoformat(),
|
|
"model_used": self.model_name,
|
|
"game_data_path": str(self.game_data_path),
|
|
"power_to_model": self.power_to_model
|
|
},
|
|
# Power models at root level for animation
|
|
"power_models": self.power_to_model,
|
|
# Moments at root level for animation
|
|
"moments": [self._moment_to_dict(moment) for moment in self.moments],
|
|
# Analysis results in nested structure for comprehensive analysis
|
|
"analysis_results": {
|
|
"moments": [self._moment_to_dict(moment) for moment in self.moments],
|
|
"lies": [asdict(lie) for lie in self.lies],
|
|
"invalid_moves_by_model": self.invalid_moves_by_model
|
|
},
|
|
"summary": {
|
|
"total_moments": len(self.moments),
|
|
"total_lies": len(self.lies),
|
|
"moments_by_category": {
|
|
"BETRAYAL": category_counts["betrayals"],
|
|
"COLLABORATION": category_counts["collaborations"],
|
|
"PLAYING_BOTH_SIDES": category_counts["playing_both_sides"],
|
|
"BRILLIANT_STRATEGY": category_counts["brilliant_strategies"],
|
|
"STRATEGIC_BLUNDER": category_counts["strategic_blunders"]
|
|
},
|
|
"lies_by_power": {},
|
|
"intentional_lies": len([l for l in self.lies if l.intentional]),
|
|
"unintentional_lies": len([l for l in self.lies if not l.intentional]),
|
|
"score_distribution": {
|
|
"9-10": score_dist["scores_9_10"],
|
|
"7-8": score_dist["scores_7_8"],
|
|
"4-6": score_dist["scores_4_6"],
|
|
"1-3": score_dist["scores_1_3"]
|
|
}
|
|
},
|
|
"phases_analyzed": list(set(moment.phase for moment in self.moments))
|
|
}
|
|
|
|
# Count lies by power
|
|
for lie in self.lies:
|
|
if lie.liar not in unified_data["summary"]["lies_by_power"]:
|
|
unified_data["summary"]["lies_by_power"][lie.liar] = 0
|
|
unified_data["summary"]["lies_by_power"][lie.liar] += 1
|
|
|
|
# Write to file with proper formatting
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
json.dump(unified_data, f, indent=2, ensure_ascii=False)
|
|
|
|
logger.info(f"Unified JSON results saved: {output_path}")
|
|
return str(output_path)
|
|
|
|
|
|
def _moment_to_dict(self, moment: GameMoment) -> dict:
|
|
"""Convert a GameMoment to a dictionary with all fields"""
|
|
# Ensure all powers have entries in diary_context
|
|
all_powers = ["AUSTRIA", "ENGLAND", "FRANCE", "GERMANY", "ITALY", "RUSSIA", "TURKEY"]
|
|
normalized_diary = {}
|
|
for power in all_powers:
|
|
normalized_diary[power] = moment.diary_context.get(power, "")
|
|
|
|
# Ensure all powers have entries in state_update_context
|
|
normalized_state_update = {}
|
|
state_update_context = moment.state_update_context if hasattr(moment, 'state_update_context') else {}
|
|
for power in all_powers:
|
|
normalized_state_update[power] = state_update_context.get(power, "")
|
|
|
|
return {
|
|
"phase": moment.phase,
|
|
"category": moment.category,
|
|
"powers_involved": moment.powers_involved,
|
|
"promise_agreement": moment.promise_agreement,
|
|
"actual_action": moment.actual_action,
|
|
"impact": moment.impact,
|
|
"interest_score": moment.interest_score,
|
|
"raw_messages": moment.raw_messages,
|
|
"raw_orders": moment.raw_orders,
|
|
"diary_context": normalized_diary,
|
|
"state_update_context": normalized_state_update
|
|
}
|
|
|
|
async def main():
|
|
"""Main entry point for the script"""
|
|
parser = argparse.ArgumentParser(description='Analyze Diplomacy game for key strategic moments using LLM')
|
|
parser.add_argument('results_folder', help='Path to the game results folder')
|
|
parser.add_argument('--model', default='openrouter-google/gemini-2.5-flash-preview',
|
|
help='Model to use for analysis')
|
|
parser.add_argument('--max-phases', type=int, help='Maximum number of phases to analyze')
|
|
parser.add_argument('--output', help='Output file path for the markdown report')
|
|
parser.add_argument('--json', help='Output file path for the JSON data (defaults to moments.json in results folder)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Create analyzer
|
|
analyzer = GameAnalyzer(args.results_folder, args.model)
|
|
|
|
# Initialize
|
|
await analyzer.initialize()
|
|
|
|
# Analyze game
|
|
await analyzer.analyze_game(max_phases=args.max_phases)
|
|
|
|
# Generate coordinated outputs
|
|
# Always generate the report
|
|
report_path = await analyzer.generate_report(args.output)
|
|
|
|
# Generate JSON output - unified format that works for both analysis and animation
|
|
if args.json:
|
|
# Use the specified path
|
|
json_path = analyzer.save_json_results(args.json)
|
|
else:
|
|
# Default to moments.json in the results folder
|
|
json_path = analyzer.save_json_results()
|
|
|
|
# Print summary
|
|
print(f"\nAnalysis Complete!")
|
|
print(f"Found {len(analyzer.moments)} key moments")
|
|
print(f"Detected {len(analyzer.lies)} lies")
|
|
print(f"\nReport saved to: {report_path}")
|
|
print(f"JSON data saved to: {json_path}")
|
|
|
|
# Show score distribution
|
|
print("\nScore Distribution:")
|
|
print(f" Scores 9-10: {len([m for m in analyzer.moments if m.interest_score >= 9])}")
|
|
print(f" Scores 7-8: {len([m for m in analyzer.moments if 7 <= m.interest_score < 9])}")
|
|
print(f" Scores 4-6: {len([m for m in analyzer.moments if 4 <= m.interest_score < 7])}")
|
|
print(f" Scores 1-3: {len([m for m in analyzer.moments if m.interest_score < 4])}")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |