diff --git a/analysis/statistical_game_analysis.py b/analysis/statistical_game_analysis.py new file mode 100644 index 0000000..3061f2e --- /dev/null +++ b/analysis/statistical_game_analysis.py @@ -0,0 +1,1131 @@ +#!/usr/bin/env python3 +""" +Statistical Game Analysis for AI Diplomacy + +Production-ready tool that analyzes AI Diplomacy game results and generates comprehensive +statistical analysis in CSV format. Supports both single game analysis and batch processing. + +Features: +- Phase-level analysis with response-type granularity +- Game-level aggregated metrics +- Comprehensive failure/success tracking +- Message composition and relationship analysis +- Data validation and error handling + +Usage: + python statistical_game_analysis.py + python statistical_game_analysis.py --batch + +Author: Generated for AI Diplomacy project +Version: 2.0 (Hard Mode with complete validation) +""" + +import os +import json +import csv +import argparse +from pathlib import Path +from collections import defaultdict, Counter +import re +from typing import Dict, List, Tuple, Optional, Any +import statistics + +class StatisticalGameAnalyzer: + """Production-ready analyzer for AI Diplomacy game statistics. + + This class handles comprehensive statistical analysis of AI Diplomacy games, + including negotiation patterns, relationship dynamics, response quality metrics, + and game state evolution. Designed for reliability and maintainability. + """ + + # Class constants for better maintainability + RELATIONSHIP_VALUES = { + 'Enemy': -2, + 'Unfriendly': -1, + 'Neutral': 0, + 'Friendly': 1, + 'Ally': 2 + } + + DIPLOMACY_POWERS = ['AUSTRIA', 'ENGLAND', 'FRANCE', 'GERMANY', 'ITALY', 'RUSSIA', 'TURKEY'] + + # Complete list of response types found in actual data + RESPONSE_TYPES = [ + 'negotiation_message', 'negotiation_diary', 'state_update', 'initial_state_setup', + 'order_generation', 'order_diary', 'state_update_parsing_empty_or_invalid_data', + 'diary_consolidation', 'state_update_partial_data', 'state_update_no_response' + ] + + def __init__(self): + """Initialize analyzer with configuration constants.""" + self.relationship_values = self.RELATIONSHIP_VALUES + self.powers = self.DIPLOMACY_POWERS + + def _normalize_recipient_name(self, recipient: str) -> str: + """Normalize recipient names to handle LLM typos and abbreviations.""" + if not recipient: + return recipient + + recipient = recipient.upper().strip() + + # Handle common LLM typos and abbreviations found in data + name_mapping = { + 'EGMANY': 'GERMANY', + 'GERMAN': 'GERMANY', + 'UK': 'ENGLAND', + 'BRIT': 'ENGLAND', + 'ENGLAND': 'ENGLAND', # Keep as-is + 'FRANCE': 'FRANCE', # Keep as-is + 'GERMANY': 'GERMANY', # Keep as-is + 'ITALY': 'ITALY', # Keep as-is + 'AUSTRIA': 'AUSTRIA', # Keep as-is + 'RUSSIA': 'RUSSIA', # Keep as-is + 'TURKEY': 'TURKEY', # Keep as-is + } + + normalized = name_mapping.get(recipient, recipient) + + # Validate it's a known power + if normalized not in self.DIPLOMACY_POWERS: + return None # Invalid recipient + + return normalized + + def analyze_folder(self, folder_path: str, output_dir: str = None) -> Tuple[str, str]: + """ + Analyze a single results folder and generate CSV outputs. + + Args: + folder_path: Path to results folder containing llm_responses.csv and lmvsgame.json + output_dir: Directory to save outputs (default: analysis subfolder) + + Returns: + Tuple of (phase_csv_path, game_csv_path) + + Raises: + FileNotFoundError: If required data files are missing + ValueError: If data format is invalid + """ + folder_path = Path(folder_path) + + # Validate input folder exists + if not folder_path.exists() or not folder_path.is_dir(): + raise FileNotFoundError(f"Results folder not found: {folder_path}") + + # Set up output directory + if output_dir is None: + output_dir = folder_path / "analysis" + else: + output_dir = Path(output_dir) + + try: + output_dir.mkdir(exist_ok=True) + except PermissionError as e: + raise PermissionError(f"Cannot create output directory {output_dir}: {e}") + + print(f"Analyzing folder: {folder_path}") + + # Validate required files exist + llm_responses_path = folder_path / "llm_responses.csv" + game_json_path = folder_path / "lmvsgame.json" + + if not llm_responses_path.exists(): + raise FileNotFoundError(f"Required file missing: {llm_responses_path}") + if not game_json_path.exists(): + raise FileNotFoundError(f"Required file missing: {game_json_path}") + + try: + # Load and validate data + llm_responses = self._load_llm_responses(llm_responses_path) + if not llm_responses: + raise ValueError("llm_responses.csv is empty or contains no valid data") + + with open(game_json_path, 'r', encoding='utf-8') as f: + game_data = json.load(f) + + if not game_data.get('phases'): + raise ValueError("lmvsgame.json contains no phase data") + + # Generate analysis + phase_features = self._extract_phase_features(llm_responses, game_data) + game_features = self._extract_game_features(llm_responses, game_data) + + if not phase_features or not game_features: + raise ValueError("Failed to extract analysis features from data") + + # Save outputs + game_id = folder_path.name + phase_csv_path = output_dir / f"{game_id}_phase_analysis.csv" + game_csv_path = output_dir / f"{game_id}_game_analysis.csv" + + self._save_phase_csv(phase_features, phase_csv_path) + self._save_game_csv(game_features, game_csv_path) + + print(f"Saved {len(phase_features)} phase records to {phase_csv_path}") + print(f"Saved {len(game_features)} game records to {game_csv_path}") + + return str(phase_csv_path), str(game_csv_path) + + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON in {game_json_path}: {e}") + except Exception as e: + raise RuntimeError(f"Analysis failed: {e}") from e + + def analyze_multiple_folders(self, parent_folder: str, output_dir: str = None) -> None: + """ + Analyze multiple results folders and combine outputs. + + Args: + parent_folder: Path containing multiple results folders + output_dir: Directory to save combined outputs + """ + parent_path = Path(parent_folder) + if output_dir is None: + output_dir = parent_path / "statistical_analysis" + else: + output_dir = Path(output_dir) + output_dir.mkdir(exist_ok=True) + + # Find all results folders (look for folders with llm_responses.csv) + results_folders = [] + for item in parent_path.iterdir(): + if item.is_dir() and (item / "llm_responses.csv").exists(): + results_folders.append(item) + + if not results_folders: + raise ValueError(f"No results folders found in {parent_folder}") + + print(f"Found {len(results_folders)} results folders to analyze") + + all_phase_data = [] + all_game_data = [] + + # Analyze each folder + for folder in results_folders: + try: + print(f"\nAnalyzing {folder.name}...") + phase_csv, game_csv = self.analyze_folder(folder, output_dir / "individual") + + # Load and combine data + phase_data = self._load_csv_as_dicts(phase_csv) + game_data = self._load_csv_as_dicts(game_csv) + + all_phase_data.extend(phase_data) + all_game_data.extend(game_data) + + except Exception as e: + print(f"Error analyzing {folder.name}: {e}") + continue + + # Combine all data + if all_phase_data: + # Save combined outputs + combined_phase_path = output_dir / "combined_phase_analysis.csv" + combined_game_path = output_dir / "combined_game_analysis.csv" + + self._save_phase_csv(all_phase_data, combined_phase_path) + self._save_game_csv(all_game_data, combined_game_path) + + print(f"\nCombined phase analysis saved to: {combined_phase_path}") + print(f"Combined game analysis saved to: {combined_game_path}") + print(f"Total games analyzed: {len(set(row.get('game_id') for row in all_game_data))}") + print(f"Total phase records: {len(all_phase_data)}") + + def _load_llm_responses(self, csv_path: Path) -> List[dict]: + """Load and validate LLM responses CSV.""" + responses = [] + required_columns = ['model', 'power', 'phase', 'response_type', 'raw_response'] + + with open(csv_path, 'r', encoding='utf-8') as f: + reader = csv.DictReader(f) + + # Check required columns + missing_columns = [col for col in required_columns if col not in reader.fieldnames] + if missing_columns: + raise ValueError(f"Missing required columns in CSV: {missing_columns}") + + for row in reader: + responses.append(row) + + return responses + + def _extract_phase_features(self, llm_responses: List[dict], game_data: dict) -> List[dict]: + """Extract phase-level features for all powers, phases, and response types.""" + phase_features = [] + + # Get all unique phases from game data + phases = [phase['name'] for phase in game_data['phases']] + + # Use class constant for response types + response_types = self.RESPONSE_TYPES + + for phase_name in phases: + # Get phase data from game JSON + phase_data = next((p for p in game_data['phases'] if p['name'] == phase_name), None) + if not phase_data: + continue + + for power in self.powers: + for response_type in response_types: + # Extract features for this specific power/phase/response_type combination + features = self._extract_power_phase_response_features( + power, phase_name, response_type, llm_responses, phase_data, game_data + ) + if features: + phase_features.append(features) + + return phase_features + + def _extract_power_phase_response_features(self, power: str, phase: str, response_type: str, + llm_responses: List[dict], phase_data: dict, + game_data: dict) -> Optional[dict]: + """Extract features for a specific power/phase/response_type combination.""" + + # Get responses of this type for this power/phase + relevant_responses = [ + response for response in llm_responses + if (response.get('power') == power and + response.get('phase') == phase and + response.get('response_type') == response_type) + ] + + # Skip if no responses of this type + if not relevant_responses: + return None + + # Base feature dict with organized, descriptive names + features = { + # === PRIMARY IDENTIFIERS (ordered as requested) === + 'game_id': game_data.get('id', 'unknown'), + 'llm_model': self._get_model_for_power(power, llm_responses), + 'game_phase': phase, + 'analyzed_response_type': response_type, + 'power_name': power, + + # === RESPONSE INFO === + 'llm_responses_of_this_type': len(relevant_responses) + } + + # === FAILURE ANALYSIS (HARD MODE) === + failure_metrics = self._analyze_failures(power, phase, response_type, llm_responses) + features.update(failure_metrics) + + + # Add response-type specific features + if response_type == 'negotiation_message': + negotiation_features = self._extract_negotiation_features(power, phase, llm_responses, phase_data) + features.update(negotiation_features) + elif response_type in ['negotiation_diary', 'state_update', 'initial_state_setup']: + reflection_features = self._extract_reflection_features(power, phase, llm_responses, phase_data, game_data, response_type) + features.update(reflection_features) + + # Always include game state features for context + game_state_features = self._extract_game_state_features(power, phase, phase_data, game_data) + features.update(game_state_features) + + return features + + def _extract_negotiation_features(self, power: str, phase: str, + llm_responses: List[dict], phase_data: dict) -> dict: + """Extract negotiation-related metrics for a power in a phase.""" + + # Get negotiation messages for this power in this phase + negotiation_msgs = [ + response for response in llm_responses + if (response.get('power') == power and + response.get('phase') == phase and + response.get('response_type') == 'negotiation_message') + ] + + # Initialize negotiation features with descriptive names + features = { + # === NEGOTIATION METRICS === + 'total_messages_sent': 0, + 'messages_to_allies': 0, + 'messages_to_enemies': 0, + 'messages_to_neutrals': 0, + 'global_messages_count': 0, + 'private_messages_count': 0, + 'percent_global_messages': 0.0, + 'average_message_length_chars': 0.0 + } + + if not negotiation_msgs: + return features + + # Parse messages from raw responses + all_messages = [] + total_length = 0 + + for response in negotiation_msgs: + messages = self._parse_negotiation_messages(response.get('raw_response', ''), power, phase) + all_messages.extend(messages) + + if not all_messages: + return features + + # Get relationships for this phase + relationships = self._get_relationships_for_phase(power, phase, phase_data) + + # Calculate message statistics + features['total_messages_sent'] = len(all_messages) + + for msg in all_messages: + msg_length = len(msg.get('content', '')) + total_length += msg_length + + if msg.get('is_global', False): + features['global_messages_count'] += 1 + else: + features['private_messages_count'] += 1 + + # Categorize by relationship + recipient = msg.get('recipient_power') + normalized_recipient = self._normalize_recipient_name(recipient) + + # Skip self-messages and invalid recipients + if normalized_recipient and normalized_recipient != power and normalized_recipient in relationships: + rel_value = self.relationship_values.get(relationships[normalized_recipient], 0) + if rel_value >= 1: # Friendly or Ally + features['messages_to_allies'] += 1 + elif rel_value <= -1: # Enemy or Unfriendly + features['messages_to_enemies'] += 1 + else: # Neutral + features['messages_to_neutrals'] += 1 + + # Calculate percentages and averages + if features['total_messages_sent'] > 0: + features['percent_global_messages'] = (features['global_messages_count'] / features['total_messages_sent']) * 100 + features['average_message_length_chars'] = total_length / features['total_messages_sent'] + + # Calculate relationship-based message percentages + total_categorized = features['messages_to_allies'] + features['messages_to_enemies'] + features['messages_to_neutrals'] + if total_categorized > 0: + features['percent_messages_to_allies'] = (features['messages_to_allies'] / total_categorized) * 100 + features['percent_messages_to_enemies'] = (features['messages_to_enemies'] / total_categorized) * 100 + features['percent_messages_to_neutrals'] = (features['messages_to_neutrals'] / total_categorized) * 100 + else: + features['percent_messages_to_allies'] = 0.0 + features['percent_messages_to_enemies'] = 0.0 + features['percent_messages_to_neutrals'] = 0.0 + else: + features['percent_messages_to_allies'] = 0.0 + features['percent_messages_to_enemies'] = 0.0 + features['percent_messages_to_neutrals'] = 0.0 + + return features + + def _extract_reflection_features(self, power: str, phase: str, + llm_responses: List[dict], phase_data: dict, + game_data: dict, specific_response_type: str = None) -> dict: + """Extract reflection-related metrics for a power in a phase.""" + + features = { + # === REFLECTION METRICS === + 'llm_response_tokens_estimated': 0, + 'llm_response_time_ms': 0.0, + 'relationship_stability_vs_prev_phase': 1.0, + 'avg_sentiment_toward_others': 0.0, + 'avg_sentiment_from_others': 0.0, + 'sentiment_change_from_prev': 0.0 + } + + # Get diary entries for this power in this phase + if specific_response_type: + # Filter to only the specific response type + diary_entries = [ + response for response in llm_responses + if (response.get('power') == power and + response.get('phase') == phase and + response.get('response_type') == specific_response_type) + ] + else: + # Get all reflection-type responses + diary_entries = [ + response for response in llm_responses + if (response.get('power') == power and + response.get('phase') == phase and + response.get('response_type') in ['negotiation_diary', 'state_update', 'initial_state_setup']) + ] + + if not diary_entries: + return features + + # Calculate reflection metrics + total_tokens = 0 + for response in diary_entries: + response_text = str(response.get('raw_response', '')) + # Estimate tokens (rough approximation: words * 1.3) + word_count = len(response_text.split()) + total_tokens += int(word_count * 1.3) + + features['llm_response_tokens_estimated'] = total_tokens + + # Calculate relationship similarity with previous phase + current_relationships = self._get_relationships_for_phase(power, phase, phase_data) + prev_phase_data = self._get_previous_phase_data(phase, game_data) + + if prev_phase_data: + prev_relationships = self._get_relationships_for_phase(power, prev_phase_data['name'], prev_phase_data) + features['relationship_stability_vs_prev_phase'] = self._calculate_relationship_similarity( + prev_relationships, current_relationships + ) + + # Calculate sentiment metrics + sentiment_metrics = self._calculate_sentiment_metrics(power, phase, phase_data) + features.update(sentiment_metrics) + + return features + + def _extract_game_state_features(self, power: str, phase: str, + phase_data: dict, game_data: dict) -> dict: + """Extract game state metrics for a power in a phase.""" + + features = { + # === GAME STATE === + 'territories_controlled_count': 0, + 'supply_centers_owned_count': 0, + 'military_units_count': 0, + 'territories_gained_vs_prev_phase': 0, + 'supply_centers_gained_vs_prev_phase': 0, + 'military_units_gained_vs_prev_phase': 0 + } + + # Get current state + state = phase_data.get('state', {}) + + # Count current resources + units = state.get('units', {}).get(power, []) + centers = state.get('centers', {}).get(power, []) + influence = state.get('influence', {}).get(power, []) + + features['military_units_count'] = len(units) + features['supply_centers_owned_count'] = len(centers) + features['territories_controlled_count'] = len(influence) + + # Calculate deltas from previous phase + prev_phase_data = self._get_previous_phase_data(phase, game_data) + if prev_phase_data: + prev_state = prev_phase_data.get('state', {}) + prev_units = prev_state.get('units', {}).get(power, []) + prev_centers = prev_state.get('centers', {}).get(power, []) + prev_influence = prev_state.get('influence', {}).get(power, []) + + features['military_units_gained_vs_prev_phase'] = features['military_units_count'] - len(prev_units) + features['supply_centers_gained_vs_prev_phase'] = features['supply_centers_owned_count'] - len(prev_centers) + features['territories_gained_vs_prev_phase'] = features['territories_controlled_count'] - len(prev_influence) + + return features + + def _extract_game_features(self, llm_responses: List[dict], game_data: dict) -> List[dict]: + """Extract game-level features (placeholder for future implementation).""" + + game_features = [] + + for power in self.powers: + features = { + # === IDENTIFIERS === + 'game_id': game_data.get('id', 'unknown'), + 'llm_model': self._get_model_for_power(power, llm_responses), + 'power_name': power, + + # === FINAL STATE METRICS (End game snapshot) === + 'final_territories_controlled': 0, + 'final_supply_centers_owned': 0, + 'final_military_units': 0, + 'game_result': 'unknown', # win/loss/draw + 'final_ranking_by_supply_centers': 0, + + # === TOTALS (Complete game sums) === + 'total_negotiation_messages_sent': 0, + 'total_messages_to_allies': 0, + 'total_messages_to_enemies': 0, + 'total_messages_to_neutrals': 0, + 'total_global_messages': 0, + 'total_private_messages': 0, + 'total_response_tokens_estimated': 0, + 'total_llm_interactions': 0, + 'total_phases_active': 0, + + # === AVERAGES (Behavioral patterns over time) === + 'avg_negotiation_messages_per_phase': 0.0, + 'avg_relationship_stability_per_phase': 0.0, + 'avg_sentiment_toward_others': 0.0, + 'avg_sentiment_from_others': 0.0, + 'avg_response_tokens_per_interaction': 0.0, + 'avg_territories_controlled_per_phase': 0.0, + 'avg_supply_centers_owned_per_phase': 0.0, + 'avg_military_units_per_phase': 0.0, + 'percent_messages_to_allies_overall': 0.0, + 'percent_messages_to_enemies_overall': 0.0, + 'percent_global_vs_private_overall': 0.0, + + # === FAILURE ANALYSIS TOTALS (HARD MODE) === + 'total_llm_calls_overall': 0, + 'total_failed_llm_calls': 0, + 'total_success_llm_calls': 0, + 'overall_failure_rate_percentage': 0.0, + 'overall_success_rate_percentage': 0.0, + + } + + # === CALCULATE FINAL STATE METRICS === + if game_data['phases']: + final_phase = game_data['phases'][-1] + final_state = final_phase.get('state', {}) + + # Final counts + final_centers = final_state.get('centers', {}).get(power, []) + final_units = final_state.get('units', {}).get(power, []) + final_influence = final_state.get('influence', {}).get(power, []) + + features['final_supply_centers_owned'] = len(final_centers) + features['final_military_units'] = len(final_units) + features['final_territories_controlled'] = len(final_influence) + + # Calculate final ranking (1 = highest SC count, 7 = lowest) + all_final_centers = final_state.get('centers', {}) + sc_counts = [(len(centers), pwr) for pwr, centers in all_final_centers.items()] + sc_counts.sort(reverse=True) # Sort by SC count descending + + for rank, (sc_count, pwr) in enumerate(sc_counts, 1): + if pwr == power: + features['final_ranking_by_supply_centers'] = rank + break + + # Determine game result + if len(final_centers) >= 18: + features['game_result'] = 'solo_victory' + elif rank == 1: + features['game_result'] = 'leading' + elif rank <= 3: + features['game_result'] = 'survivor' + else: + features['game_result'] = 'eliminated_or_weak' + + # === CALCULATE AVERAGED BEHAVIORAL METRICS === + self._calculate_averaged_game_metrics(features, power, llm_responses, game_data) + + game_features.append(features) + + return game_features + + def _calculate_averaged_game_metrics(self, features: dict, power: str, + llm_responses: List[dict], game_data: dict) -> None: + """Calculate both totals and averaged behavioral metrics across the entire game.""" + + # Initialize collections + power_phases = [] + sentiment_toward_values = [] + sentiment_from_values = [] + territories_per_phase = [] + supply_centers_per_phase = [] + military_units_per_phase = [] + relationship_stability_values = [] + + # Track previous relationships for stability calculation + prev_relationships = None + + # Collect data from all phases + for phase in game_data['phases']: + phase_name = phase['name'] + power_phases.append(phase_name) + + # Get game state data for averages + state = phase.get('state', {}) + territories = len(state.get('influence', {}).get(power, [])) + supply_centers = len(state.get('centers', {}).get(power, [])) + military_units = len(state.get('units', {}).get(power, [])) + + territories_per_phase.append(territories) + supply_centers_per_phase.append(supply_centers) + military_units_per_phase.append(military_units) + + # Get relationship data for sentiment calculations + agent_relationships = phase.get('agent_relationships', {}) + if power in agent_relationships: + power_relationships = agent_relationships[power] + + # Calculate sentiment toward others + if power_relationships: + outgoing_values = [self.relationship_values.get(rel, 0) for rel in power_relationships.values()] + if outgoing_values: + sentiment_toward_values.append(statistics.mean(outgoing_values)) + + # Calculate sentiment from others + incoming_values = [] + for other_power, relationships in agent_relationships.items(): + if other_power != power and power in relationships: + incoming_values.append(self.relationship_values.get(relationships[power], 0)) + if incoming_values: + sentiment_from_values.append(statistics.mean(incoming_values)) + + # Calculate relationship stability + if prev_relationships is not None: + stability = self._calculate_relationship_similarity(prev_relationships, power_relationships) + relationship_stability_values.append(stability) + + prev_relationships = power_relationships.copy() + + # === CALCULATE TOTALS === + features['total_phases_active'] = len(power_phases) + + # Calculate total LLM interactions and tokens + message composition + total_tokens = 0 + total_responses = 0 + total_ally_msgs = 0 + total_enemy_msgs = 0 + total_neutral_msgs = 0 + total_global_msgs = 0 + total_private_msgs = 0 + + for response in llm_responses: + if response.get('power') != power: + continue + + total_responses += 1 + + # Count tokens for all responses + response_text = str(response.get('raw_response', '')) + word_count = len(response_text.split()) + total_tokens += int(word_count * 1.3) + + # Parse negotiation messages for composition analysis + if response.get('response_type') == 'negotiation_message': + phase_name = response.get('phase') + messages = self._parse_negotiation_messages(response_text, power, phase_name) + + # Get relationships for this phase + phase_data = next((p for p in game_data['phases'] if p['name'] == phase_name), None) + if phase_data: + relationships = self._get_relationships_for_phase(power, phase_name, phase_data) + + for msg in messages: + if msg.get('is_global', False): + total_global_msgs += 1 + else: + total_private_msgs += 1 + + # Categorize by relationship + recipient = msg.get('recipient_power') + normalized_recipient = self._normalize_recipient_name(recipient) + + # Skip self-messages and invalid recipients + if normalized_recipient and normalized_recipient != power and normalized_recipient in relationships: + rel_value = self.relationship_values.get(relationships[normalized_recipient], 0) + if rel_value >= 1: # Friendly or Ally + total_ally_msgs += 1 + elif rel_value <= -1: # Enemy or Unfriendly + total_enemy_msgs += 1 + else: # Neutral + total_neutral_msgs += 1 + + # Calculate total negotiation messages as sum of parsed individual messages + features['total_negotiation_messages_sent'] = total_global_msgs + total_private_msgs + + features['total_llm_interactions'] = total_responses + features['total_response_tokens_estimated'] = total_tokens + features['total_messages_to_allies'] = total_ally_msgs + features['total_messages_to_enemies'] = total_enemy_msgs + features['total_messages_to_neutrals'] = total_neutral_msgs + features['total_global_messages'] = total_global_msgs + features['total_private_messages'] = total_private_msgs + + # === CALCULATE AVERAGES === + if power_phases: + features['avg_negotiation_messages_per_phase'] = features['total_negotiation_messages_sent'] / len(power_phases) + + if territories_per_phase: + features['avg_territories_controlled_per_phase'] = statistics.mean(territories_per_phase) + if supply_centers_per_phase: + features['avg_supply_centers_owned_per_phase'] = statistics.mean(supply_centers_per_phase) + if military_units_per_phase: + features['avg_military_units_per_phase'] = statistics.mean(military_units_per_phase) + + if sentiment_toward_values: + features['avg_sentiment_toward_others'] = statistics.mean(sentiment_toward_values) + if sentiment_from_values: + features['avg_sentiment_from_others'] = statistics.mean(sentiment_from_values) + + if relationship_stability_values: + features['avg_relationship_stability_per_phase'] = statistics.mean(relationship_stability_values) + + if total_responses > 0: + features['avg_response_tokens_per_interaction'] = total_tokens / total_responses + + # Calculate message composition percentages + total_categorized_msgs = total_ally_msgs + total_enemy_msgs + total_neutral_msgs + total_all_msgs = total_global_msgs + total_private_msgs + + if total_categorized_msgs > 0: + features['percent_messages_to_allies_overall'] = (total_ally_msgs / total_categorized_msgs) * 100 + features['percent_messages_to_enemies_overall'] = (total_enemy_msgs / total_categorized_msgs) * 100 + + if total_all_msgs > 0: + features['percent_global_vs_private_overall'] = (total_global_msgs / total_all_msgs) * 100 + + # === FAILURE ANALYSIS AGGREGATION (HARD MODE) === + total_calls = 0 + total_failures = 0 + total_successes = 0 + + # Get all responses for this power across all phases/response types + power_responses = [r for r in llm_responses if r.get('power') == power] + + for response in power_responses: + total_calls += 1 + success_status = response.get('success', '').strip() + if self._is_failure_status(success_status): + total_failures += 1 + elif self._is_success_status(success_status): + total_successes += 1 + + features['total_llm_calls_overall'] = total_calls + features['total_failed_llm_calls'] = total_failures + features['total_success_llm_calls'] = total_successes + + if total_calls > 0: + features['overall_failure_rate_percentage'] = (total_failures / total_calls) * 100.0 + features['overall_success_rate_percentage'] = (total_successes / total_calls) * 100.0 + + + # Helper methods + + def _parse_negotiation_messages(self, raw_response: str, sender: str, phase: str) -> List[dict]: + """Parse negotiation messages from raw LLM response.""" + messages = [] + + # Try to extract JSON messages + json_blocks = re.findall(r'```json\s*(\{.*?\})\s*```', raw_response, re.DOTALL) + + if not json_blocks: + # Try to find direct JSON objects + json_blocks = re.findall(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', raw_response) + + for json_str in json_blocks: + try: + msg_data = json.loads(json_str) + if isinstance(msg_data, dict) and 'content' in msg_data: + message = { + 'sender_power': sender, + 'phase': phase, + 'content': msg_data.get('content', ''), + 'is_global': msg_data.get('message_type') == 'global', + 'recipient_power': msg_data.get('recipient') if msg_data.get('message_type') == 'private' else None + } + messages.append(message) + except json.JSONDecodeError: + continue + + return messages + + def _get_model_for_power(self, power: str, llm_responses: List[dict]) -> str: + """Get the model used for a specific power.""" + for response in llm_responses: + if response.get('power') == power: + return response.get('model', 'unknown') + return 'unknown' + + def _get_relationships_for_phase(self, power: str, phase: str, phase_data: dict) -> dict: + """Get relationships for a power in a specific phase.""" + agent_relationships = phase_data.get('agent_relationships', {}) + return agent_relationships.get(power, {}) + + def _get_previous_phase_data(self, current_phase: str, game_data: dict) -> Optional[dict]: + """Get the phase data for the phase before the current one.""" + phases = game_data['phases'] + for i, phase in enumerate(phases): + if phase['name'] == current_phase and i > 0: + return phases[i-1] + return None + + def _calculate_relationship_similarity(self, prev_relationships: dict, current_relationships: dict) -> float: + """Calculate similarity between two relationship dictionaries.""" + if not prev_relationships or not current_relationships: + return 1.0 + + all_powers = set(prev_relationships.keys()) | set(current_relationships.keys()) + if not all_powers: + return 1.0 + + matches = 0 + for power in all_powers: + prev_rel = prev_relationships.get(power, 'Neutral') + curr_rel = current_relationships.get(power, 'Neutral') + if prev_rel == curr_rel: + matches += 1 + + return matches / len(all_powers) + + def _calculate_sentiment_metrics(self, power: str, phase: str, phase_data: dict) -> dict: + """Calculate sentiment metrics for a power.""" + + metrics = { + 'avg_sentiment_toward_others': 0.0, + 'avg_sentiment_from_others': 0.0, + 'sentiment_change_from_prev': 0.0 + } + + agent_relationships = phase_data.get('agent_relationships', {}) + if not agent_relationships: + return metrics + + # Calculate how this power perceives others (outgoing sentiment) + power_relationships = agent_relationships.get(power, {}) + if power_relationships: + outgoing_values = [self.relationship_values.get(rel, 0) for rel in power_relationships.values()] + avg_outgoing = statistics.mean(outgoing_values) if outgoing_values else 0 + else: + avg_outgoing = 0 + + # Calculate how others perceive this power (incoming sentiment) + incoming_values = [] + for other_power, relationships in agent_relationships.items(): + if other_power != power and power in relationships: + incoming_values.append(self.relationship_values.get(relationships[power], 0)) + + avg_incoming = statistics.mean(incoming_values) if incoming_values else 0 + + metrics['avg_sentiment_toward_others'] = avg_outgoing + metrics['avg_sentiment_from_others'] = avg_incoming + + return metrics + + def _extract_territory_from_unit(self, unit_str: str) -> str: + """Extract territory name from unit string (e.g., 'A BER' -> 'BER', 'F STP/SC' -> 'STP').""" + parts = unit_str.strip().split() + if len(parts) >= 2: + territory = parts[1] + # Handle special coast notation like 'STP/SC' -> 'STP' + if '/' in territory: + territory = territory.split('/')[0] + return territory + return unit_str + + def _load_csv_as_dicts(self, csv_path: str) -> List[dict]: + """Load CSV file as list of dictionaries.""" + data = [] + with open(csv_path, 'r', encoding='utf-8') as f: + reader = csv.DictReader(f) + for row in reader: + data.append(row) + return data + + def _save_phase_csv(self, phase_features: List[dict], output_path: Path) -> None: + """Save phase-level features to CSV.""" + if not phase_features: + print("No phase features to save") + return + + # Define explicit column order + fieldnames = [ + # === PRIMARY IDENTIFIERS === + 'game_id', + 'llm_model', + 'game_phase', + 'analyzed_response_type', + 'power_name', + + # === RESPONSE INFO === + 'llm_responses_of_this_type', + + # === RESPONSE QUALITY ANALYSIS (HARD MODE) === + 'total_responses_analyzed', + 'failed_responses_count', + 'successful_responses_count', + 'response_failure_rate_percentage', + 'response_success_rate_percentage', + + + # === NEGOTIATION METRICS === + 'total_messages_sent', + 'messages_to_allies', + 'messages_to_enemies', + 'messages_to_neutrals', + 'global_messages_count', + 'private_messages_count', + 'percent_global_messages', + 'percent_messages_to_allies', + 'percent_messages_to_enemies', + 'percent_messages_to_neutrals', + 'average_message_length_chars', + + # === REFLECTION METRICS === + 'llm_response_tokens_estimated', + 'llm_response_time_ms', + 'relationship_stability_vs_prev_phase', + 'avg_sentiment_toward_others', + 'avg_sentiment_from_others', + 'sentiment_change_from_prev', + + # === GAME STATE === + 'territories_controlled_count', + 'supply_centers_owned_count', + 'military_units_count', + 'territories_gained_vs_prev_phase', + 'supply_centers_gained_vs_prev_phase', + 'military_units_gained_vs_prev_phase' + ] + + # Ensure all actual fields are included (in case we missed any) + actual_fields = set() + for row in phase_features: + actual_fields.update(row.keys()) + + # Add any missing fields at the end + for field in sorted(actual_fields): + if field not in fieldnames: + fieldnames.append(field) + + with open(output_path, 'w', newline='', encoding='utf-8') as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(phase_features) + + print(f"Saved {len(phase_features)} phase records to {output_path}") + + def _save_game_csv(self, game_features: List[dict], output_path: Path) -> None: + """Save game-level features to CSV.""" + if not game_features: + print("No game features to save") + return + + # Define explicit column order for game-level CSV + fieldnames = [ + # === PRIMARY IDENTIFIERS === + 'game_id', + 'llm_model', + 'power_name', + + # === FINAL STATE METRICS (End game snapshot) === + 'final_territories_controlled', + 'final_supply_centers_owned', + 'final_military_units', + 'game_result', + 'final_ranking_by_supply_centers', + + # === TOTALS (Complete game sums) === + 'total_negotiation_messages_sent', + 'total_messages_to_allies', + 'total_messages_to_enemies', + 'total_messages_to_neutrals', + 'total_global_messages', + 'total_private_messages', + 'total_response_tokens_estimated', + 'total_llm_interactions', + 'total_phases_active', + + # === FAILURE ANALYSIS TOTALS (HARD MODE) === + 'total_llm_calls_overall', + 'total_failed_llm_calls', + 'total_success_llm_calls', + 'overall_failure_rate_percentage', + 'overall_success_rate_percentage', + + + # === AVERAGES (Behavioral patterns over time) === + 'avg_negotiation_messages_per_phase', + 'avg_relationship_stability_per_phase', + 'avg_sentiment_toward_others', + 'avg_sentiment_from_others', + 'avg_response_tokens_per_interaction', + 'avg_territories_controlled_per_phase', + 'avg_supply_centers_owned_per_phase', + 'avg_military_units_per_phase', + 'percent_messages_to_allies_overall', + 'percent_messages_to_enemies_overall', + 'percent_global_vs_private_overall' + ] + + # Ensure all actual fields are included + actual_fields = set() + for row in game_features: + actual_fields.update(row.keys()) + + # Add any missing fields at the end + for field in sorted(actual_fields): + if field not in fieldnames: + fieldnames.append(field) + + with open(output_path, 'w', newline='', encoding='utf-8') as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(game_features) + + print(f"Saved {len(game_features)} game records to {output_path}") + + def _analyze_failures(self, power: str, phase: str, response_type: str, + llm_responses: List[dict]) -> dict: + """Analyze failure patterns for specific power/phase/response_type.""" + responses = [r for r in llm_responses + if r['power'] == power and r['phase'] == phase and r['response_type'] == response_type] + + total_responses = len(responses) + if total_responses == 0: + return { + 'total_responses_analyzed': 0, + 'failed_responses_count': 0, + 'successful_responses_count': 0, + 'response_failure_rate_percentage': 0.0, + 'response_success_rate_percentage': 0.0 + } + + failed_count = 0 + success_count = 0 + + for response in responses: + success_status = response.get('success', '').strip() + if self._is_failure_status(success_status): + failed_count += 1 + elif self._is_success_status(success_status): + success_count += 1 + + return { + 'total_responses_analyzed': total_responses, + 'failed_responses_count': failed_count, + 'successful_responses_count': success_count, + 'response_failure_rate_percentage': (failed_count / total_responses) * 100.0 if total_responses > 0 else 0.0, + 'response_success_rate_percentage': (success_count / total_responses) * 100.0 if total_responses > 0 else 0.0 + } + + def _is_failure_status(self, status: str) -> bool: + """Check if status indicates failure.""" + status_lower = status.lower() + return any(indicator in status_lower for indicator in ['false', 'failure:', 'error', 'failed']) + + def _is_success_status(self, status: str) -> bool: + """Check if status indicates success.""" + status_lower = status.lower() + return any(indicator in status_lower for indicator in ['true', 'success:', 'success', 'partial']) + + + +def main(): + """Main entry point for the Statistical Game Analysis tool.""" + parser = argparse.ArgumentParser(description='Statistical Game Analysis for AI Diplomacy') + parser.add_argument('input_path', help='Path to results folder or parent folder containing multiple results') + parser.add_argument('--output', '-o', help='Output directory (default: same as input)') + parser.add_argument('--multiple', '-m', action='store_true', + help='Treat input as parent folder containing multiple results folders') + + args = parser.parse_args() + + analyzer = StatisticalGameAnalyzer() + + try: + if args.multiple: + analyzer.analyze_multiple_folders(args.input_path, args.output) + else: + analyzer.analyze_folder(args.input_path, args.output) + + print("\nAnalysis complete!") + + except Exception as e: + print(f"Error: {e}") + return 1 + + return 0 + + +if __name__ == '__main__': + exit(main()) \ No newline at end of file