AI_Diplomacy/diplomacy_unified_analysis_final.py
AlxAI 9fc25f2fec Add comprehensive Diplomacy analysis with visualizations
- Added diplomacy_unified_analysis_final.py: Complete analysis script with CSV-only approach
- Added DIPLOMACY_ANALYSIS_DOCUMENTATION.md: Comprehensive project documentation
- Added visualization_experiments_log.md: Detailed development history
- Added visualization_results/: AAAI-quality visualizations showing model evolution
- Fixed old format success calculation bug (results keyed by unit location)
- Demonstrated AI evolution from passive to active play across 61 models
- Updated .gitignore to exclude results_alpha

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-27 13:29:29 -04:00

1411 lines
No EOL
55 KiB
Python

#!/usr/bin/env python3
"""
Enhanced CSV-Only Diplomacy Model Analysis Script
- Uses ONLY CSV data as the source of truth
- No JSON parsing that can mistake messages for model names
- Includes comprehensive visualization suite
- Proper scaling and ordering of visualizations
"""
import json
import sys
import csv
from pathlib import Path
from collections import defaultdict
from datetime import datetime, timedelta
import argparse
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import numpy as np
import pandas as pd
import seaborn as sns
from scipy import stats
# Increase CSV field size limit
csv.field_size_limit(sys.maxsize)
# AAAI publication quality styling
plt.rcParams.update({
'font.size': 12,
'axes.titlesize': 14,
'axes.labelsize': 13,
'xtick.labelsize': 11,
'ytick.labelsize': 11,
'legend.fontsize': 11,
'figure.dpi': 150,
'savefig.dpi': 300,
'font.family': 'sans-serif',
'axes.linewidth': 1.5,
'lines.linewidth': 2.5,
'lines.markersize': 8,
'grid.alpha': 0.3,
'axes.grid': True,
'axes.spines.top': False,
'axes.spines.right': False,
'figure.figsize': (10, 6), # Default single figure size
})
# Color schemes
COLORS = {
'hold': '#808080', # Gray
'move': '#2E5090', # Deep Blue
'support': '#009E73', # Green
'convoy': '#CC79A7', # Purple
'active': '#D55E00', # Orange for active orders
'success': '#2ECC71', # Success Green
'failure': '#E74C3C', # Failure Red
}
def get_year_from_phase_name(phase_name):
"""Extract year from phase name (e.g., 'S1901M' -> 1901)"""
if len(phase_name) >= 5:
try:
year_str = phase_name[1:5]
return int(year_str)
except:
return None
return None
def get_decade_bin(year):
"""Get decade bin for a year (e.g., 1903 -> '1900-1910')"""
if year is None:
return None
decade_start = (year // 10) * 10
decade_end = decade_start + 10
return f"{decade_start}-{decade_end}"
def extract_models_from_csv(game_dir):
"""Extract ALL models from CSV file ONLY - this is the source of truth"""
models = set()
csv_file = game_dir / "llm_responses.csv"
if csv_file.exists():
try:
print(f" Reading CSV file: {csv_file}")
# First, get total row count for progress tracking
with open(csv_file, 'r', encoding='utf-8', errors='ignore') as f:
row_count = sum(1 for line in f) - 1 # Subtract header
print(f" Total rows: {row_count}")
# Read the model column to get unique models
df = pd.read_csv(csv_file, usecols=['model'])
if 'model' in df.columns:
# Get all unique models
unique_models = df['model'].dropna().unique()
for model in unique_models:
if model and str(model).strip() and str(model) != 'model':
models.add(str(model).strip())
print(f" Found {len(unique_models)} unique models in CSV")
except Exception as e:
print(f" Error reading CSV: {e}")
return models
def analyze_game(game_file_path):
"""Analyze a single game using CSV for model-power-phase mappings"""
game_dir = game_file_path.parent
game_timestamp = datetime.fromtimestamp(game_file_path.stat().st_mtime)
print(f"\nAnalyzing game: {game_dir.name}")
# Get all models from CSV only
all_models = extract_models_from_csv(game_dir)
# Initialize result
result = {
'game_id': game_dir.name,
'timestamp': game_timestamp,
'all_models': list(all_models),
'power_models': {}, # We'll build this from CSV
'phase_data': defaultdict(list)
}
# Load game data
try:
with open(game_file_path, 'r') as f:
game_data = json.load(f)
except:
return result
# Read CSV to get model-power-phase mappings
csv_file = game_dir / "llm_responses.csv"
if not csv_file.exists():
return result
try:
# Read the CSV with phase, power, and model columns
df = pd.read_csv(csv_file, usecols=['phase', 'power', 'model'])
# Process each phase in the game
for phase in game_data.get('phases', []):
phase_name = phase.get('name', '')
if not phase_name.endswith('M'):
continue
year = get_year_from_phase_name(phase_name)
decade = get_decade_bin(year)
# Get unit counts from phase state
phase_state = phase.get('state', {})
phase_units = phase_state.get('units', {})
# Get all rows for this phase
phase_df = df[df['phase'] == phase_name]
# For each model that played in this phase, aggregate their orders
model_phase_data = defaultdict(lambda: {
'phase_name': phase_name,
'year': year,
'decade': decade,
'power': 'AGGREGATE', # Aggregating across all powers
'game_id': game_dir.name,
'total_orders': 0,
'order_counts': {'hold': 0, 'move': 0, 'support': 0, 'convoy': 0},
'order_successes': {'hold': 0, 'move': 0, 'support': 0, 'convoy': 0},
'unit_count': 0,
})
# Process each power that played in this phase
for power in ['AUSTRIA', 'ENGLAND', 'FRANCE', 'GERMANY', 'ITALY', 'RUSSIA', 'TURKEY']:
# Get the model for this power in this phase
power_phase_df = phase_df[phase_df['power'] == power]
if len(power_phase_df) == 0:
continue
model = power_phase_df.iloc[0]['model']
if pd.isna(model):
continue
model = str(model).strip()
# Track this power-model mapping
result['power_models'][power] = model
# Count units for this power
unit_count = len(phase_units.get(power, []))
model_phase_data[model]['unit_count'] += unit_count
# Process orders from the phase data
# Try new format first
if 'order_results' in phase and power in phase.get('order_results', {}):
power_orders = phase['order_results'][power]
for order_type in ['hold', 'move', 'support', 'convoy']:
orders = power_orders.get(order_type, [])
count = len(orders)
success_count = sum(1 for order in orders if order.get('result', '') == 'success')
model_phase_data[model]['order_counts'][order_type] += count
model_phase_data[model]['order_successes'][order_type] += success_count
model_phase_data[model]['total_orders'] += count
# Try old format
elif 'orders' in phase and power in phase.get('orders', {}):
order_list = phase['orders'][power]
results_dict = phase.get('results', {})
if order_list:
for idx, order_str in enumerate(order_list):
# Extract unit location from order (e.g., "A PAR - PIC" -> "A PAR")
unit_loc = None
if ' - ' in order_str or ' S ' in order_str or ' C ' in order_str or ' H' in order_str:
# Extract the unit and location part before the order
parts = order_str.strip().split(' ')
if len(parts) >= 2 and parts[0] in ['A', 'F']:
unit_loc = f"{parts[0]} {parts[1]}"
# Determine order type
if ' H' in order_str or order_str.endswith(' H'):
order_type = 'hold'
elif ' - ' in order_str:
order_type = 'move'
elif ' S ' in order_str:
order_type = 'support'
elif ' C ' in order_str:
order_type = 'convoy'
else:
order_type = 'hold'
model_phase_data[model]['order_counts'][order_type] += 1
model_phase_data[model]['total_orders'] += 1
# Check if successful using unit location
if unit_loc and unit_loc in results_dict:
result_value = results_dict[unit_loc]
# In old format: empty list or empty string means success
# Non-empty means some kind of failure (bounce, dislodged, void)
if isinstance(result_value, list) and len(result_value) == 0:
# Empty list means success
model_phase_data[model]['order_successes'][order_type] += 1
elif isinstance(result_value, str) and result_value == "":
# Empty string means success
model_phase_data[model]['order_successes'][order_type] += 1
elif result_value is None:
# None might also mean success in some cases
model_phase_data[model]['order_successes'][order_type] += 1
# Append phase stats for each model
for model, stats in model_phase_data.items():
result['phase_data'][model].append(stats)
except Exception as e:
print(f"Error processing CSV for {game_dir.name}: {e}")
return result
def create_comprehensive_charts(all_data, output_dir):
"""Create all visualization charts"""
# Aggregate model statistics
model_stats = defaultdict(lambda: {
'games_participated': set(),
'total_phases': 0,
'total_orders': 0,
'order_counts': {'hold': 0, 'move': 0, 'support': 0, 'convoy': 0},
'order_successes': {'hold': 0, 'move': 0, 'support': 0, 'convoy': 0},
'powers_played': defaultdict(int),
'decade_distribution': defaultdict(int),
'phase_details': [],
'unit_counts': [], # List of unit counts across all phases
'unit_count_distribution': defaultdict(int) # How many phases with X units
})
# First pass: collect all models mentioned anywhere
all_models_found = set()
models_missing_phases = defaultdict(set) # Track which games have models without phases
for game_data in all_data:
all_models_found.update(game_data['all_models'])
# Track models that appear in games but not in phase data
models_in_phase_data = set(game_data['phase_data'].keys())
models_missing = set(game_data['all_models']) - models_in_phase_data
for model in models_missing:
models_missing_phases[model].add(game_data['game_id'])
# Second pass: aggregate phase data
for game_data in all_data:
game_id = game_data['game_id']
# Track game participation for all models in game
for model in game_data['all_models']:
model_stats[model]['games_participated'].add(game_id)
# Process phase data
for model, phases in game_data['phase_data'].items():
for phase in phases:
model_stats[model]['total_phases'] += 1
model_stats[model]['total_orders'] += phase['total_orders']
model_stats[model]['powers_played'][phase['power']] += 1
model_stats[model]['phase_details'].append(phase)
# Track unit counts
unit_count = phase.get('unit_count', 0)
if unit_count > 0:
model_stats[model]['unit_counts'].append(unit_count)
model_stats[model]['unit_count_distribution'][unit_count] += 1
if phase['decade']:
model_stats[model]['decade_distribution'][phase['decade']] += 1
# Aggregate order counts and successes
for order_type in ['hold', 'move', 'support', 'convoy']:
model_stats[model]['order_counts'][order_type] += phase['order_counts'][order_type]
model_stats[model]['order_successes'][order_type] += phase['order_successes'][order_type]
# Calculate derived metrics
for model, stats in model_stats.items():
# Active order percentage
total = stats['total_orders']
active = total - stats['order_counts']['hold'] if total > 0 else 0
stats['active_percentage'] = (active / total * 100) if total > 0 else 0
# Success rates
stats['success_rates'] = {}
for order_type in ['hold', 'move', 'support', 'convoy']:
count = stats['order_counts'][order_type]
success = stats['order_successes'][order_type]
stats['success_rates'][order_type] = (success / count * 100) if count > 0 else 0
# Overall success rate on active orders (excluding holds)
total_active = sum(stats['order_counts'][t] for t in ['move', 'support', 'convoy'])
total_active_success = sum(stats['order_successes'][t] for t in ['move', 'support', 'convoy'])
stats['active_success_rate'] = (total_active_success / total_active * 100) if total_active > 0 else 0
# Create visualizations
print("\nCreating comprehensive visualizations...")
# 1. High-quality models analysis (must come first)
create_high_quality_models_chart(model_stats, output_dir)
# 2. Success rates charts
create_success_rates_charts(model_stats, output_dir, all_models_found)
# 3. Active order percentage charts
create_active_order_percentage_charts(model_stats, output_dir)
# 4. Order distribution charts
create_order_distribution_charts(model_stats, output_dir)
# 5. Temporal analysis
create_temporal_analysis(model_stats, output_dir)
# 6. Power distribution analysis
create_power_distribution_analysis(model_stats, output_dir)
# 7. Physical dates timeline
create_physical_dates_timeline(all_data, model_stats, output_dir)
# 8. Phase and game counts
create_phase_game_counts(model_stats, output_dir)
# 9. Model comparison heatmap
create_comparison_heatmap(model_stats, output_dir)
# 10. Unit control analysis
create_unit_control_analysis(model_stats, output_dir)
# 11. Success over physical time
create_success_over_physical_time(all_data, model_stats, output_dir)
# 12. Model evolution chart
create_model_evolution_chart(all_data, model_stats, output_dir)
# Save comprehensive analysis metadata
save_metadata = {
'total_games': len(all_data),
'total_unique_models': len(all_models_found),
'models_with_phase_data': len([m for m in model_stats if model_stats[m]['total_phases'] > 0]),
'models_without_phase_data': len(models_missing_phases),
'models_with_active_orders': len([m for m in model_stats if model_stats[m]['active_percentage'] > 0]),
'timestamp': datetime.now().isoformat()
}
with open(output_dir / 'analysis_metadata.json', 'w') as f:
json.dump(save_metadata, f, indent=2)
# Create summary report
create_summary_report(model_stats, all_models_found, models_missing_phases, output_dir)
return model_stats
def create_high_quality_models_chart(model_stats, output_dir):
"""Create focused visualization for models with substantial gameplay data"""
# Filter for models with meaningful data
high_quality_models = []
for model, stats in model_stats.items():
total_orders = stats.get('total_orders', 0)
non_hold_orders = total_orders - stats.get('order_counts', {}).get('hold', 0)
phases = stats.get('total_phases', 0)
# Only include models with substantial active gameplay
if non_hold_orders >= 500 and phases >= 200:
non_hold_successes = sum(stats.get('order_successes', {}).get(t, 0)
for t in ['move', 'support', 'convoy'])
success_rate = (non_hold_successes / non_hold_orders * 100) if non_hold_orders > 0 else 0
active_percentage = (non_hold_orders / total_orders * 100)
high_quality_models.append({
'model': model,
'phases': phases,
'games': len(stats.get('games_participated', set())),
'success_rate': success_rate,
'active_percentage': active_percentage,
'non_hold_orders': non_hold_orders,
'move_rate': stats['order_counts']['move'] / total_orders * 100,
'support_rate': stats['order_counts']['support'] / total_orders * 100,
'convoy_rate': stats['order_counts']['convoy'] / total_orders * 100
})
if not high_quality_models:
print("No high-quality models found with 500+ active orders and 200+ phases")
return
# Sort by success rate
high_quality_models.sort(key=lambda x: x['success_rate'], reverse=True)
print(f"\nHigh-Quality Models: {len(high_quality_models)} models with 500+ active orders and 200+ phases")
# Create visualization
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 10))
# Left chart: Success rates
model_names = []
success_rates = []
active_percentages = []
for data in high_quality_models[:20]: # Top 20
model_display = data['model'].split('/')[-1] if '/' in data['model'] else data['model']
model_display = model_display[:30]
model_names.append(f"{model_display} ({data['phases']}p)")
success_rates.append(data['success_rate'])
active_percentages.append(data['active_percentage'])
y_pos = np.arange(len(model_names))
bars1 = ax1.barh(y_pos, success_rates, color=COLORS['success'], alpha=0.8)
# Add value labels
for i, (bar, rate) in enumerate(zip(bars1, success_rates)):
ax1.text(bar.get_width() + 0.5, bar.get_y() + bar.get_height()/2,
f'{rate:.1f}%', va='center', fontsize=9)
ax1.set_yticks(y_pos)
ax1.set_yticklabels(model_names, fontsize=10)
ax1.set_xlabel('Success Rate on Active Orders (%)', fontsize=12)
ax1.set_title('Top Performing Models\n(500+ active orders, 200+ phases)', fontsize=14, fontweight='bold')
ax1.axvline(x=50, color='red', linestyle='--', alpha=0.5, label='50% baseline')
ax1.set_xlim(35, 70)
# Right chart: Active order composition
move_rates = [d['move_rate'] for d in high_quality_models[:20]]
support_rates = [d['support_rate'] for d in high_quality_models[:20]]
convoy_rates = [d['convoy_rate'] for d in high_quality_models[:20]]
x = np.arange(len(model_names))
width = 0.8
bars_move = ax2.barh(x, move_rates, width, label='Move', color=COLORS['move'], alpha=0.8)
bars_support = ax2.barh(x, support_rates, width, left=move_rates, label='Support', color=COLORS['support'], alpha=0.8)
bars_convoy = ax2.barh(x, convoy_rates, width,
left=[m+s for m,s in zip(move_rates, support_rates)],
label='Convoy', color=COLORS['convoy'], alpha=0.8)
ax2.set_yticks(x)
ax2.set_yticklabels([]) # Hide labels on right chart
ax2.set_xlabel('Order Type Distribution (%)', fontsize=12)
ax2.set_title('Active Order Composition', fontsize=14, fontweight='bold')
ax2.legend(loc='lower right')
ax2.set_xlim(0, 100)
plt.suptitle(f'High-Quality Model Analysis\n{len(high_quality_models)} models with substantial active gameplay',
fontsize=16, fontweight='bold')
plt.tight_layout()
fig.savefig(output_dir / '00_high_quality_models.png', dpi=300, bbox_inches='tight')
plt.close()
def create_success_rates_charts(model_stats, output_dir, all_models_found):
"""Create success rate charts for all models"""
# Filter to models with actual phase data and calculate success rates
models_with_data = []
for model, stats in model_stats.items():
if stats['total_phases'] > 0:
# Calculate success rate on active orders only
active_orders = sum(stats['order_counts'][t] for t in ['move', 'support', 'convoy'])
active_successes = sum(stats['order_successes'][t] for t in ['move', 'support', 'convoy'])
if active_orders > 0:
success_rate = (active_successes / active_orders * 100)
else:
success_rate = 0
models_with_data.append({
'model': model,
'success_rate': success_rate,
'active_orders': active_orders,
'total_phases': stats['total_phases'],
'active_percentage': stats['active_percentage']
})
if not models_with_data:
print("No models with phase data found!")
return
# Sort by total active orders (to show most active models first)
models_with_data.sort(key=lambda x: x['active_orders'], reverse=True)
# Create the main success rates chart
fig, ax = plt.subplots(figsize=(16, max(10, len(models_with_data) * 0.25)))
models = []
success_rates = []
colors = []
for data in models_with_data:
models.append(data['model'])
success_rates.append(data['success_rate'])
# Color based on success rate
if data['success_rate'] > 50:
colors.append(COLORS['success'])
else:
colors.append(COLORS['failure'])
if models:
y_pos = np.arange(len(models))
# Create horizontal bars
bars = ax.barh(y_pos, success_rates, color=colors)
# Add value labels
for i, (bar, rate, data) in enumerate(zip(bars, success_rates, models_with_data)):
# Add success rate
if rate > 0 or data['active_orders'] > 0:
ax.text(bar.get_width() + 1, bar.get_y() + bar.get_height()/2,
f'{rate:.1f}%\n({data["active_orders"]} active)',
va='center', fontsize=8)
else:
ax.text(1, bar.get_y() + bar.get_height()/2,
f'0.0%\n({data["total_phases"]} phases)\n{data["active_percentage"]:.0f}% active',
va='center', fontsize=8, color='gray')
ax.set_yticks(y_pos)
ax.set_yticklabels(models, fontsize=10)
ax.set_xlabel('Active Order Success Rate (%)', fontsize=12)
ax.set_title(f'Success Rates on Active Orders - {len(models)} Models', fontsize=14)
ax.axvline(x=50, color='red', linestyle='--', alpha=0.5)
ax.grid(True, alpha=0.3)
ax.set_xlim(0, 100)
plt.tight_layout()
plt.savefig(output_dir / 'all_models_success_rates.png', dpi=300, bbox_inches='tight')
plt.close()
def create_active_order_percentage_charts(model_stats, output_dir):
"""Create active order percentage chart (sorted by activity level)"""
# Get models with order data
models_with_orders = []
for model, stats in model_stats.items():
if stats['total_orders'] > 0:
models_with_orders.append({
'model': model,
'active_percentage': stats['active_percentage'],
'total_orders': stats['total_orders'],
'total_phases': stats['total_phases']
})
if not models_with_orders:
return
# Sort by active percentage
models_with_orders.sort(key=lambda x: x['active_percentage'], reverse=True)
fig, ax = plt.subplots(figsize=(16, max(10, len(models_with_orders) * 0.25)))
models = []
active_pcts = []
total_orders = []
for data in models_with_orders:
models.append(data['model'])
active_pcts.append(data['active_percentage'])
total_orders.append(data['total_orders'])
if models:
y_pos = np.arange(len(models))
# Create gradient colors based on activity level
colors = plt.cm.RdYlGn(np.array(active_pcts) / 100)
bars = ax.barh(y_pos, active_pcts, color=colors)
# Add value labels
for i, (bar, pct, orders) in enumerate(zip(bars, active_pcts, total_orders)):
ax.text(bar.get_width() + 1, bar.get_y() + bar.get_height()/2,
f'{pct:.1f}%\n({orders} orders)',
va='center', fontsize=8)
ax.set_yticks(y_pos)
ax.set_yticklabels(models, fontsize=10)
ax.set_xlabel('Active Order Percentage (%)', fontsize=12)
ax.set_title(f'Active Order Percentage by Model - Sorted by Activity Level', fontsize=14)
ax.grid(True, alpha=0.3)
ax.set_xlim(0, 100)
# Add reference line at 50%
ax.axvline(x=50, color='black', linestyle='--', alpha=0.5, label='50% threshold')
plt.tight_layout()
plt.savefig(output_dir / 'all_models_active_percentage.png', dpi=300, bbox_inches='tight')
plt.close()
def create_order_distribution_charts(model_stats, output_dir):
"""Create order distribution heatmap"""
# Filter models with orders
models_with_orders = []
for model, stats in model_stats.items():
if stats['total_orders'] > 0:
models_with_orders.append((model, stats))
if not models_with_orders:
return
# Sort by total orders
models_with_orders.sort(key=lambda x: x[1]['total_orders'], reverse=True)
# Take top models that fit well in visualization
max_models = min(50, len(models_with_orders))
models_with_orders = models_with_orders[:max_models]
fig, ax = plt.subplots(figsize=(12, max(10, len(models_with_orders) * 0.3)))
# Prepare data for heatmap
order_types = ['hold', 'move', 'support', 'convoy']
heatmap_data = []
model_names = []
for model, stats in models_with_orders:
model_names.append(model)
row = []
for order_type in order_types:
pct = (stats['order_counts'][order_type] / stats['total_orders'] * 100)
row.append(pct)
heatmap_data.append(row)
if heatmap_data:
# Create heatmap
sns.heatmap(heatmap_data,
xticklabels=order_types,
yticklabels=model_names,
annot=True, fmt='.1f',
cmap='YlOrRd',
cbar_kws={'label': 'Percentage of Orders (%)'},
ax=ax)
ax.set_title('Order Type Distribution by Model', fontsize=14)
ax.set_xlabel('Order Type', fontsize=12)
ax.set_ylabel('Model', fontsize=12)
plt.tight_layout()
plt.savefig(output_dir / 'all_models_order_distribution.png', dpi=300, bbox_inches='tight')
plt.close()
def create_temporal_analysis(model_stats, output_dir):
"""Create temporal analysis by decade"""
# Get models with temporal data
models_with_decades = []
for model, stats in model_stats.items():
if stats['decade_distribution'] and stats['total_phases'] >= 50:
models_with_decades.append((model, stats))
if not models_with_decades:
print("No models with sufficient temporal data found")
return
models_with_decades.sort(key=lambda x: x[1]['total_phases'], reverse=True)
# Take top models for clarity
max_models = min(20, len(models_with_decades))
models_with_decades = models_with_decades[:max_models]
# Calculate grid dimensions
cols = 4
rows = (max_models + cols - 1) // cols
fig, axes = plt.subplots(rows, cols, figsize=(20, 5 * rows))
if rows == 1:
axes = axes.reshape(1, -1)
axes = axes.flatten()
for idx, (model, stats) in enumerate(models_with_decades):
ax = axes[idx]
# Calculate success rates by decade
decade_success = {}
for phase in stats['phase_details']:
if phase['decade']:
if phase['decade'] not in decade_success:
decade_success[phase['decade']] = {'orders': 0, 'successes': 0}
decade_success[phase['decade']]['orders'] += phase['total_orders']
decade_success[phase['decade']]['successes'] += sum(phase['order_successes'].values())
if not decade_success:
ax.set_visible(False)
continue
decades = sorted(decade_success.keys())
success_rates = []
for decade in decades:
data = decade_success[decade]
rate = (data['successes'] / data['orders'] * 100) if data['orders'] > 0 else 0
success_rates.append(rate)
# Create bar chart
x = range(len(decades))
bars = ax.bar(x, success_rates, color=COLORS['move'], alpha=0.8)
# Add value labels
for i, (bar, rate) in enumerate(zip(bars, success_rates)):
ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1,
f'{rate:.0f}%', ha='center', va='bottom', fontsize=8)
ax.set_xticks(x)
ax.set_xticklabels([d.split('-')[0] for d in decades], rotation=45)
ax.set_ylim(0, 100)
ax.axhline(y=50, color='red', linestyle='--', alpha=0.3)
ax.set_ylabel('Success Rate (%)')
ax.set_title(f'{model}\n({stats["total_phases"]} phases)', fontsize=10)
ax.grid(True, alpha=0.3)
# Hide unused subplots
for idx in range(max_models, len(axes)):
axes[idx].set_visible(False)
fig.suptitle('Temporal Success Analysis by Decade', fontsize=16, fontweight='bold')
plt.tight_layout()
fig.savefig(output_dir / 'temporal_analysis_decades.png', dpi=300, bbox_inches='tight')
plt.close()
def create_power_distribution_analysis(model_stats, output_dir):
"""Create power distribution analysis"""
# Get models with power data
models_with_powers = []
for model, stats in model_stats.items():
if stats['powers_played'] and stats['total_phases'] >= 50:
models_with_powers.append((model, stats))
if not models_with_powers:
return
models_with_powers.sort(key=lambda x: x[1]['total_phases'], reverse=True)
max_models = min(30, len(models_with_powers))
fig, ax = plt.subplots(figsize=(14, max(10, max_models * 0.4)))
# Prepare data
powers = ['AUSTRIA', 'ENGLAND', 'FRANCE', 'GERMANY', 'ITALY', 'RUSSIA', 'TURKEY']
power_colors = {
'AUSTRIA': '#FF6B6B',
'ENGLAND': '#4ECDC4',
'FRANCE': '#45B7D1',
'GERMANY': '#96CEB4',
'ITALY': '#DDA0DD',
'RUSSIA': '#F4A460',
'TURKEY': '#FFD93D'
}
heatmap_data = []
model_names = []
for model, stats in models_with_powers[:max_models]:
model_names.append(model)
row = []
total_power_phases = sum(stats['powers_played'].values())
for power in powers:
count = stats['powers_played'].get(power, 0)
pct = (count / total_power_phases * 100) if total_power_phases > 0 else 0
row.append(pct)
heatmap_data.append(row)
if heatmap_data:
# Create heatmap
sns.heatmap(heatmap_data,
xticklabels=powers,
yticklabels=model_names,
annot=True, fmt='.0f',
cmap='Blues',
cbar_kws={'label': 'Percentage of Phases (%)'},
ax=ax)
ax.set_title('Power Distribution by Model', fontsize=14)
ax.set_xlabel('Power', fontsize=12)
ax.set_ylabel('Model', fontsize=12)
plt.tight_layout()
plt.savefig(output_dir / 'power_distribution_heatmap.png', dpi=300, bbox_inches='tight')
plt.close()
def create_physical_dates_timeline(all_data, model_stats, output_dir):
"""Create timeline showing model activity over actual dates"""
# Extract dates from game IDs
date_model_activity = defaultdict(lambda: defaultdict(int))
for game_data in all_data:
# Try to extract date from game_id
game_id = game_data['game_id']
game_date = None
# Try different date formats
if len(game_id) >= 8 and game_id[:8].isdigit():
try:
game_date = datetime.strptime(game_id[:8], '%Y%m%d').date()
except:
pass
if not game_date:
# Try to use timestamp
if 'timestamp' in game_data:
game_date = game_data['timestamp'].date()
if game_date:
for model in game_data['all_models']:
date_model_activity[game_date][model] += 1
if not date_model_activity:
print("No date information found in game data")
return
# Get top models by total activity
model_totals = defaultdict(int)
for date_data in date_model_activity.values():
for model, count in date_data.items():
model_totals[model] += count
top_models = sorted(model_totals.items(), key=lambda x: x[1], reverse=True)[:10]
top_model_names = [m[0] for m in top_models]
# Prepare data for plotting
dates = sorted(date_model_activity.keys())
fig, ax = plt.subplots(figsize=(16, 8))
for model in top_model_names:
model_dates = []
model_counts = []
for date in dates:
if model in date_model_activity[date]:
model_dates.append(date)
model_counts.append(date_model_activity[date][model])
if model_dates:
ax.plot(model_dates, model_counts, marker='o', label=model, alpha=0.7)
ax.set_xlabel('Date', fontsize=12)
ax.set_ylabel('Games per Day', fontsize=12)
ax.set_title('Model Activity Timeline', fontsize=14, fontweight='bold')
ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
ax.grid(True, alpha=0.3)
# Format x-axis
import matplotlib.dates as mdates
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
ax.xaxis.set_major_locator(mdates.MonthLocator())
plt.xticks(rotation=45)
plt.tight_layout()
fig.savefig(output_dir / 'physical_dates_timeline.png', dpi=300, bbox_inches='tight')
plt.close()
def create_phase_game_counts(model_stats, output_dir):
"""Create phase and game count comparison"""
# Get models with games
models_with_games = [(m, s) for m, s in model_stats.items()
if len(s['games_participated']) > 0]
if not models_with_games:
return
models_with_games.sort(key=lambda x: (x[1]['total_phases'], len(x[1]['games_participated'])),
reverse=True)
# Take top models
max_models = min(40, len(models_with_games))
models_with_games = models_with_games[:max_models]
fig, ax = plt.subplots(figsize=(14, 10))
model_names = []
phase_counts = []
game_counts = []
for model, stats in models_with_games:
model_names.append(model)
phase_counts.append(stats['total_phases'])
game_counts.append(len(stats['games_participated']))
x = np.arange(len(model_names))
width = 0.35
bars1 = ax.bar(x - width/2, phase_counts, width, label='Phases', color=COLORS['move'])
bars2 = ax.bar(x + width/2, game_counts, width, label='Games', color=COLORS['support'])
# Add value labels for significant values
for bars in [bars1, bars2]:
for bar in bars:
height = bar.get_height()
if height > 10:
ax.annotate(f'{int(height)}',
xy=(bar.get_x() + bar.get_width() / 2, height),
xytext=(0, 3), textcoords="offset points",
ha='center', va='bottom', fontsize=7,
rotation=90 if height > 1000 else 0)
ax.set_xlabel('Model')
ax.set_ylabel('Count (log scale)')
ax.set_yscale('log')
ax.set_title(f'Phase and Game Counts by Model (Top {max_models})', fontweight='bold')
ax.set_xticks(x)
ax.set_xticklabels(model_names, rotation=45, ha='right', fontsize=8)
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
fig.savefig(output_dir / 'phase_game_counts.png', dpi=300, bbox_inches='tight')
plt.close()
def create_comparison_heatmap(model_stats, output_dir):
"""Create comparison heatmap for top models"""
# Get top models by phases
top_models = [(m, s) for m, s in model_stats.items() if s['total_phases'] >= 50]
if not top_models:
return
top_models.sort(key=lambda x: x[1]['total_phases'], reverse=True)
top_models = top_models[:20]
fig, ax = plt.subplots(figsize=(14, 10))
# Prepare comparison data
comparison_data = []
model_names = []
for model, stats in top_models:
total_orders = stats['total_orders']
if total_orders > 0:
success_rate = sum(stats['order_successes'].values()) / total_orders * 100
active_rate = (total_orders - stats['order_counts']['hold']) / total_orders * 100
complexity = (stats['order_counts']['support'] + stats['order_counts']['convoy']) / total_orders * 100
comparison_data.append([
len(stats['games_participated']),
stats['total_phases'],
success_rate,
active_rate,
complexity
])
model_names.append(model)
if not comparison_data:
return
# Create DataFrame
columns = ['Games', 'Phases', 'Success%', 'Active%', 'Complex%']
df = pd.DataFrame(comparison_data, index=model_names, columns=columns)
# Normalize for heatmap
df_normalized = (df - df.min()) / (df.max() - df.min())
sns.heatmap(df_normalized, annot=df.round(1), fmt='g', cmap='YlOrRd',
ax=ax, cbar_kws={'label': 'Normalized Score'}, annot_kws={'size': 9})
ax.set_title('Top 20 Models Comparison Heatmap', fontweight='bold', pad=20)
ax.set_xlabel('Metrics')
ax.set_ylabel('Model')
plt.tight_layout()
fig.savefig(output_dir / 'model_comparison_heatmap.png', dpi=300, bbox_inches='tight')
plt.close()
def create_unit_control_analysis(model_stats, output_dir):
"""Create unit control analysis showing performance vs unit count"""
# Collect data for unit control analysis
unit_performance_data = []
for model, stats in model_stats.items():
if stats['total_phases'] < 50: # Minimum threshold
continue
# Group performance by unit count
unit_buckets = defaultdict(lambda: {'orders': 0, 'successes': 0, 'phases': 0})
for phase in stats['phase_details']:
unit_count = phase.get('unit_count', 0)
if unit_count > 0:
# Bucket unit counts
if unit_count <= 3:
bucket = '1-3'
elif unit_count <= 6:
bucket = '4-6'
elif unit_count <= 9:
bucket = '7-9'
elif unit_count <= 12:
bucket = '10-12'
else:
bucket = '13+'
unit_buckets[bucket]['orders'] += phase['total_orders']
unit_buckets[bucket]['successes'] += sum(phase['order_successes'].values())
unit_buckets[bucket]['phases'] += 1
# Calculate success rates per bucket
for bucket, data in unit_buckets.items():
if data['orders'] > 0:
success_rate = (data['successes'] / data['orders']) * 100
unit_performance_data.append({
'model': model,
'bucket': bucket,
'success_rate': success_rate,
'orders': data['orders'],
'phases': data['phases']
})
if not unit_performance_data:
print("No unit control data found")
return
# Create visualization
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 12))
# Aggregate data by bucket
bucket_order = ['1-3', '4-6', '7-9', '10-12', '13+']
bucket_data = defaultdict(list)
for data in unit_performance_data:
bucket_data[data['bucket']].append(data['success_rate'])
# Box plot showing distribution
box_data = [bucket_data[b] for b in bucket_order]
positions = range(len(bucket_order))
bp = ax1.boxplot(box_data, positions=positions, patch_artist=True)
for patch in bp['boxes']:
patch.set_facecolor(COLORS['move'])
patch.set_alpha(0.7)
ax1.set_xticks(positions)
ax1.set_xticklabels(bucket_order)
ax1.set_xlabel('Unit Count Range', fontsize=12)
ax1.set_ylabel('Success Rate (%)', fontsize=12)
ax1.set_title('Success Rate Distribution by Unit Count', fontsize=14, fontweight='bold')
ax1.grid(True, alpha=0.3)
ax1.axhline(y=50, color='red', linestyle='--', alpha=0.5)
# Line plot for top models
top_models_data = defaultdict(lambda: defaultdict(list))
# Get top models by total phases
model_phases = [(m, sum(1 for d in unit_performance_data if d['model'] == m))
for m in set(d['model'] for d in unit_performance_data)]
model_phases.sort(key=lambda x: x[1], reverse=True)
top_models = [m[0] for m in model_phases[:10]]
for data in unit_performance_data:
if data['model'] in top_models:
top_models_data[data['model']][data['bucket']] = data['success_rate']
for model in top_models:
y_values = []
for bucket in bucket_order:
if bucket in top_models_data[model]:
y_values.append(top_models_data[model][bucket])
else:
y_values.append(None)
# Plot line with None values ignored
valid_points = [(i, y) for i, y in enumerate(y_values) if y is not None]
if valid_points:
x_vals, y_vals = zip(*valid_points)
ax2.plot(x_vals, y_vals, marker='o', label=model[:30], alpha=0.7)
ax2.set_xticks(positions)
ax2.set_xticklabels(bucket_order)
ax2.set_xlabel('Unit Count Range', fontsize=12)
ax2.set_ylabel('Success Rate (%)', fontsize=12)
ax2.set_title('Unit Control Performance - Top 10 Models', fontsize=14, fontweight='bold')
ax2.legend(bbox_to_anchor=(1.05, 1), loc='upper left', fontsize=9)
ax2.grid(True, alpha=0.3)
ax2.axhline(y=50, color='red', linestyle='--', alpha=0.5)
plt.suptitle('Unit Control Analysis - Performance vs Unit Count', fontsize=16, fontweight='bold')
plt.tight_layout()
fig.savefig(output_dir / 'unit_control_analysis.png', dpi=300, bbox_inches='tight')
plt.close()
def create_success_over_physical_time(all_data, model_stats, output_dir):
"""Create success rate evolution over physical dates"""
# Group data by week
weekly_data = defaultdict(lambda: {'orders': 0, 'successes': 0, 'games': set()})
for game_data in all_data:
game_id = game_data['game_id']
# Extract date
game_date = None
if len(game_id) >= 8 and game_id[:8].isdigit():
try:
game_date = datetime.strptime(game_id[:8], '%Y%m%d')
except:
continue
if not game_date:
continue
# Get week start (Monday)
week_start = game_date - timedelta(days=game_date.weekday())
week_key = week_start.date()
# Aggregate orders and successes
for model, phases in game_data['phase_data'].items():
for phase in phases:
weekly_data[week_key]['orders'] += phase['total_orders']
weekly_data[week_key]['successes'] += sum(phase['order_successes'].values())
weekly_data[week_key]['games'].add(game_id)
if not weekly_data:
print("No temporal data found")
return
# Sort weeks
weeks = sorted(weekly_data.keys())
success_rates = []
game_counts = []
for week in weeks:
data = weekly_data[week]
if data['orders'] > 0:
rate = (data['successes'] / data['orders']) * 100
else:
rate = 0
success_rates.append(rate)
game_counts.append(len(data['games']))
# Create visualization
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 10), sharex=True)
# Success rate over time
ax1.plot(weeks, success_rates, marker='o', linewidth=2, markersize=8, color=COLORS['success'])
ax1.fill_between(weeks, success_rates, alpha=0.3, color=COLORS['success'])
ax1.set_ylabel('Average Success Rate (%)', fontsize=12)
ax1.set_title('Success Rate Evolution Over Time', fontsize=14, fontweight='bold')
ax1.grid(True, alpha=0.3)
ax1.axhline(y=50, color='red', linestyle='--', alpha=0.5)
ax1.set_ylim(0, 100)
# Add trend line
if len(weeks) > 3:
x_numeric = np.arange(len(weeks))
z = np.polyfit(x_numeric, success_rates, 1)
p = np.poly1d(z)
ax1.plot(weeks, p(x_numeric), "--", color='black', alpha=0.5, label=f'Trend: {z[0]:.2f}% per week')
ax1.legend()
# Game count over time
ax2.bar(weeks, game_counts, alpha=0.7, color=COLORS['move'])
ax2.set_xlabel('Week Starting', fontsize=12)
ax2.set_ylabel('Games Analyzed', fontsize=12)
ax2.set_title('Game Volume Over Time', fontsize=14, fontweight='bold')
ax2.grid(True, alpha=0.3)
# Format x-axis
import matplotlib.dates as mdates
ax2.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
ax2.xaxis.set_major_locator(mdates.WeekdayLocator(interval=2))
plt.xticks(rotation=45)
plt.suptitle('Temporal Success Analysis', fontsize=16, fontweight='bold')
plt.tight_layout()
fig.savefig(output_dir / 'success_over_physical_time.png', dpi=300, bbox_inches='tight')
plt.close()
def create_model_evolution_chart(all_data, model_stats, output_dir):
"""Create model evolution chart showing version improvements"""
# Group models by family
model_families = defaultdict(list)
for model in model_stats.keys():
# Extract base model name
if '/' in model:
family = model.split('/')[0]
elif ':' in model:
family = model.split(':')[0]
else:
family = model.split('-')[0] if '-' in model else model
model_families[family].append(model)
# Find families with multiple versions
evolving_families = {f: models for f, models in model_families.items()
if len(models) > 1 and f not in ['openrouter', 'openai']}
if not evolving_families:
print("No model families with multiple versions found")
return
# Create visualization
fig, ax = plt.subplots(figsize=(14, 10))
y_position = 0
y_labels = []
for family, models in sorted(evolving_families.items()):
# Get stats for each model
model_data = []
for model in models:
stats = model_stats[model]
if stats['total_phases'] > 0:
model_data.append({
'model': model,
'success_rate': stats['active_success_rate'],
'active_pct': stats['active_percentage'],
'phases': stats['total_phases']
})
if not model_data:
continue
# Sort by some metric (phases as proxy for version)
model_data.sort(key=lambda x: x['phases'])
# Plot evolution
for i, data in enumerate(model_data):
color = plt.cm.viridis(i / max(len(model_data) - 1, 1))
# Plot point
ax.scatter(data['success_rate'], y_position, s=data['phases']/10,
color=color, alpha=0.7, edgecolors='black', linewidth=1)
# Add label
label = data['model'].split('/')[-1] if '/' in data['model'] else data['model']
ax.text(data['success_rate'] + 1, y_position, f"{label[:20]} ({data['phases']}p)",
va='center', fontsize=8)
y_labels.append(family)
y_position += 1
ax.set_yticks(range(len(y_labels)))
ax.set_yticklabels(y_labels)
ax.set_xlabel('Success Rate on Active Orders (%)', fontsize=12)
ax.set_ylabel('Model Family', fontsize=12)
ax.set_title('Model Family Evolution', fontsize=14, fontweight='bold')
ax.grid(True, alpha=0.3, axis='x')
ax.axvline(x=50, color='red', linestyle='--', alpha=0.5)
ax.set_xlim(0, 100)
# Add size legend
sizes = [100, 500, 1000]
legends = []
for s in sizes:
legends.append(plt.scatter([], [], s=s/10, c='gray', alpha=0.7, edgecolors='black', linewidth=1))
ax.legend(legends, [f'{s} phases' for s in sizes], scatterpoints=1, loc='lower right', title='Data Volume')
plt.tight_layout()
fig.savefig(output_dir / 'model_evolution_chart.png', dpi=300, bbox_inches='tight')
plt.close()
def create_summary_report(model_stats, all_models_found, models_missing_phases, output_dir):
"""Create a comprehensive summary report"""
with open(output_dir / 'ANALYSIS_SUMMARY.md', 'w') as f:
f.write("# CSV-Only Diplomacy Analysis Summary\n\n")
f.write(f"**Analysis Date:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
# Overall statistics
f.write("## Overall Statistics\n\n")
f.write(f"- **Total Unique Models:** {len(all_models_found)}\n")
f.write(f"- **Models with Phase Data:** {len([m for m in model_stats if model_stats[m]['total_phases'] > 0])}\n")
f.write(f"- **Models with Active Orders:** {len([m for m in model_stats if model_stats[m]['active_percentage'] > 0])}\n")
f.write(f"- **Models Missing Phase Data:** {len(models_missing_phases)}\n\n")
# Top performers
f.write("## Top Performing Models (by Success Rate on Active Orders)\n\n")
top_performers = []
for model, stats in model_stats.items():
if stats['active_percentage'] > 0:
top_performers.append({
'model': model,
'success_rate': stats['active_success_rate'],
'active_orders': sum(stats['order_counts'][t] for t in ['move', 'support', 'convoy']),
'total_phases': stats['total_phases']
})
top_performers.sort(key=lambda x: x['success_rate'], reverse=True)
f.write("| Model | Success Rate | Active Orders | Phases |\n")
f.write("|-------|-------------|---------------|--------|\n")
for p in top_performers[:20]:
f.write(f"| {p['model']} | {p['success_rate']:.1f}% | {p['active_orders']} | {p['total_phases']} |\n")
# Most active models
f.write("\n## Most Active Models (by Active Order Percentage)\n\n")
active_models = []
for model, stats in model_stats.items():
if stats['total_orders'] > 100: # Minimum threshold
active_models.append({
'model': model,
'active_pct': stats['active_percentage'],
'total_orders': stats['total_orders']
})
active_models.sort(key=lambda x: x['active_pct'], reverse=True)
f.write("| Model | Active % | Total Orders |\n")
f.write("|-------|----------|-------------|\n")
for a in active_models[:20]:
f.write(f"| {a['model']} | {a['active_pct']:.1f}% | {a['total_orders']} |\n")
def main():
parser = argparse.ArgumentParser(
description='Enhanced CSV-Only Diplomacy Model Analysis - Comprehensive Visualizations'
)
parser.add_argument('days', type=int, nargs='?', default=200,
help='Number of days to analyze (default: 200)')
parser.add_argument('--results-dir', default='results',
help='Results directory containing game data')
args = parser.parse_args()
# Create output directory
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_dir = Path('visualization_results') / f'csv_only_enhanced_{timestamp}_{args.days}days'
output_dir.mkdir(parents=True, exist_ok=True)
# Find games to analyze
cutoff_date = datetime.now() - timedelta(days=args.days)
results_path = Path(args.results_dir)
if not results_path.exists():
print(f"Error: Results directory not found: {results_path}")
sys.exit(1)
print(f"Enhanced CSV-Only Diplomacy Model Analysis")
print(f"=========================================")
print(f"Analyzing games from the last {args.days} days")
print(f"Using CSV files as the ONLY source of truth")
print(f"Creating comprehensive visualization suite\n")
# Collect data from all games
all_data = []
game_count = 0
for game_file in results_path.rglob("lmvsgame.json"):
if datetime.fromtimestamp(game_file.stat().st_mtime) < cutoff_date:
continue
game_count += 1
if game_count % 50 == 0:
print(f"\nProcessing game {game_count}...")
try:
game_data = analyze_game(game_file)
all_data.append(game_data)
except Exception as e:
print(f"✗ Failed {game_file.parent.name}: {e}")
print(f"\n\nProcessed {game_count} games")
# Count unique models
all_models = set()
for game_data in all_data:
all_models.update(game_data['all_models'])
print(f"Found {len(all_models)} unique models across all games")
# Create comprehensive visualizations
if all_data:
model_stats = create_comprehensive_charts(all_data, output_dir)
# Print summary
models_with_data = sum(1 for m, s in model_stats.items() if s['total_phases'] > 0)
models_with_active = sum(1 for m, s in model_stats.items() if s['active_percentage'] > 0)
print(f"\nAnalysis complete!")
print(f"- Total unique models: {len(all_models)}")
print(f"- Models with phase data: {models_with_data}")
print(f"- Models with active orders: {models_with_active}")
print(f"- Visualizations saved to: {output_dir}")
if __name__ == "__main__":
main()