AI_Diplomacy/experiments/analyze_rl_json.py
Tyler Marques a93a89f7cb
Bit of a cleanup
Moved all the files that don't need to be at top level to the
experiments folder.

Started using uv to init the project, should the install easier.
2025-06-23 09:18:20 -07:00

463 lines
27 KiB
Python

import json
import logging
import os
import argparse
from collections import defaultdict
import pandas as pd # For easier display of grouped averages
import traceback # For detailed error logging
import sys
import re
# logging.basicConfig(level=logging.DEBUG) # Removed for more specific config
def strip_json_comments(json_string_with_comments):
"""
Removes // style comments from a JSON string.
Handles comments at the end of lines.
Does not handle block comments or // within string literals.
"""
if not isinstance(json_string_with_comments, str):
# If it's not a string (e.g., already parsed list/dict, or None), return as is
return json_string_with_comments
lines = json_string_with_comments.splitlines()
cleaned_lines = []
for line in lines:
stripped_line = line.split('//', 1)[0].rstrip()
cleaned_lines.append(stripped_line)
return "\n".join(cleaned_lines)
def extract_orders_from_llm_response(llm_response_content, model_name_for_logging="UNKNOWN_MODEL"):
"""
Extracts a list of order strings from various formats of llm_response_content.
Handles direct lists, JSON strings, and strings with embedded "PARSABLE OUTPUT:" JSON blocks.
"""
orders = []
processed_content = "" # Initialize to empty string
if isinstance(llm_response_content, list):
# If it's already a list, assume it's a list of orders (strings)
# Ensure all items are strings and strip them
if all(isinstance(order, str) for order in llm_response_content):
logging.debug(f"Model {model_name_for_logging}: llm_response_content is a list of strings. Using directly.")
return [order.strip() for order in llm_response_content if order.strip()]
else:
# If list contains non-strings, try to convert to string for parsing, similar to initial design
logging.debug(f"Model {model_name_for_logging}: llm_response_content is a list with non-string items. Converting to string for parsing.")
processed_content = "\n".join(str(item) for item in llm_response_content)
elif isinstance(llm_response_content, str):
processed_content = llm_response_content
elif isinstance(llm_response_content, dict):
logging.debug(f"Model {model_name_for_logging}: llm_response_content is a dict. Checking for 'orders' key.")
# Case 1: Dictionary contains a direct 'orders' list with strings.
if "orders" in llm_response_content and isinstance(llm_response_content["orders"], list):
potential_orders = llm_response_content["orders"]
if all(isinstance(order, str) for order in potential_orders):
logging.info(f"Model {model_name_for_logging}: Extracted orders directly from 'orders' key in dict llm_response.")
return [order.strip() for order in potential_orders if order.strip()]
else:
# 'orders' key is a list but not all items are strings.
logging.warning(f"Model {model_name_for_logging}: 'orders' key in dict llm_response is a list, but not all items are strings. Content: {str(potential_orders)[:200]}")
# Fallback: Serialize the entire dictionary to a string for further parsing attempts.
try:
processed_content = json.dumps(llm_response_content)
logging.debug(f"Model {model_name_for_logging}: Serialized dict (with non-string orders list) to string for further parsing.")
except TypeError as te:
logging.warning(f"Model {model_name_for_logging}: Could not serialize dict (with non-string orders list) to JSON string: {te}. Cannot extract orders.")
return [] # Return empty list as we can't process this dict further.
else:
# Case 2: Dictionary does not have an 'orders' list or 'orders' is not a list.
logging.debug(f"Model {model_name_for_logging}: llm_response_content is a dict but no direct 'orders' list found or 'orders' is not a list. Dict (first 300 chars): {str(llm_response_content)[:300]}.")
# Fallback: Serialize the entire dictionary to a string for further parsing attempts.
try:
processed_content = json.dumps(llm_response_content)
logging.debug(f"Model {model_name_for_logging}: Serialized dict (no direct orders list) to string for further parsing.")
except TypeError as te:
logging.warning(f"Model {model_name_for_logging}: Could not serialize dict (no direct orders list) to JSON string: {te}. Cannot extract orders.")
return [] # Return empty list as we can't process this dict further.
else:
# llm_response_content is not a list, string, or dict (e.g., float, None).
logging.warning(f"Model {model_name_for_logging}: llm_response_content is type {type(llm_response_content)}, not list/string/dict. Cannot extract orders. Content: {str(llm_response_content)[:200]}")
return [] # Return empty list as this type is not processable for orders.
# At this point, 'processed_content' should be a string derived from llm_response_content
# (unless an early return occurred for direct list/dict order extraction or unhandled type).
# If 'processed_content' is empty or only whitespace, no further parsing is useful.
if not processed_content.strip():
logging.debug(f"Model {model_name_for_logging}: llm_response_content resulted in empty or whitespace-only processed_content. No orders to extract via string parsing.")
return [] # orders is already [], just return
# Attempt to parse "PARSABLE OUTPUT:" block first from 'processed_content'
match_parsable = re.search(r"PARSABLE OUTPUT:\s*(?:\{\{)?\s*\"orders\"\s*:\s*(\[.*?\])\s*(?:\}\})?", processed_content, re.IGNORECASE | re.DOTALL)
if match_parsable:
orders_json_str = match_parsable.group(1)
try:
content_to_parse = orders_json_str
stripped_json_text = "[Stripping not attempted or failed before assignment]" # Initialize placeholder
stripped_json_text = strip_json_comments(content_to_parse)
orders_list = json.loads(stripped_json_text)
if isinstance(orders_list, list):
# Ensure all items are strings, as expected for orders
if all(isinstance(order, str) for order in orders_list):
orders = [str(o).strip() for o in orders_list if str(o).strip()]
logging.debug(f"Model {model_name_for_logging}: Extracted orders from 'PARSABLE OUTPUT:' block: {orders}")
return orders
else:
logging.warning(f"Model {model_name_for_logging}: Parsed JSON from 'PARSABLE OUTPUT:' but not all items are strings: {orders_list}")
else:
logging.warning(f"Model {model_name_for_logging}: Parsed JSON from 'PARSABLE OUTPUT:' but it's not a list: {type(orders_list)}")
except json.JSONDecodeError as e_direct:
# Log original and stripped content for better debugging
logging.warning(f"Model {model_name_for_logging}: Failed to parse JSON from 'PARSABLE OUTPUT:'. Error: {e_direct}. Original (first 300): '{content_to_parse[:300]}'. Stripped (first 300): '{stripped_json_text[:300]}'")
except Exception as e_unexpected:
logging.error(f"Model {model_name_for_logging}: Unexpected error parsing 'PARSABLE OUTPUT:' JSON. Error: {e_unexpected}. Original (first 300): '{content_to_parse[:300]}'. Stripped (first 300): '{stripped_json_text[:300]}'")
# If not found via "PARSABLE OUTPUT:", attempt to parse the whole content as JSON
try:
if processed_content.strip().startswith('{') or processed_content.strip().startswith('['):
data = json.loads(processed_content)
if isinstance(data, dict) and 'orders' in data and isinstance(data['orders'], list):
orders = [str(o).strip() for o in data['orders'] if str(o).strip()]
logging.debug(f"Model {model_name_for_logging}: Extracted orders from top-level JSON 'orders' key: {orders}")
return orders
elif isinstance(data, list):
potential_orders = [str(o).strip() for o in data if str(o).strip()]
if potential_orders and all(len(po.split()) < 10 for po in potential_orders): # Heuristic
orders = potential_orders
logging.debug(f"Model {model_name_for_logging}: Extracted orders from top-level JSON list: {orders}")
return orders
except json.JSONDecodeError:
pass # Fall through
# Fallback: split by lines and apply heuristics
logging.debug(f"Model {model_name_for_logging}: No structured orders found by JSON or PARSABLE OUTPUT, falling back to line splitting. Content (first 300 chars): {processed_content[:300]}...")
raw_lines = [order.strip() for order in processed_content.splitlines() if order.strip()]
potential_orders_from_lines = []
for line in raw_lines:
parts = line.split()
if not parts: continue
first_word_upper = parts[0].upper()
if 2 <= len(parts) <= 7 and (first_word_upper == "A" or first_word_upper == "F"):
if "REASONING:" not in line.upper() and \
"PARSABLE OUTPUT:" not in line.upper() and \
"{{}}" not in line and \
"\"ORDERS\":" not in line.upper() and \
not (line.strip().startswith('[') and line.strip().endswith(']')) and \
not (line.strip().startswith('{') and line.strip().endswith('}')):
potential_orders_from_lines.append(line)
if potential_orders_from_lines:
logging.debug(f"Model {model_name_for_logging}: Extracted orders via line splitting (fallback): {potential_orders_from_lines}")
return potential_orders_from_lines
else:
logging.debug(f"Model {model_name_for_logging}: No orders extracted via line splitting fallback. Content (first 300 chars): {processed_content[:300]}")
return []
import sys
import traceback
import pandas as pd # Ensure pandas is imported at the top
def _perform_analysis(json_directory):
logging.info(f"Starting analysis of JSON directory: {json_directory}")
total_rows = 0
total_characters = 0
response_type_stats = defaultdict(lambda: {'prompt_chars': [], 'response_chars': []})
model_order_stats = defaultdict(lambda: {
'successful_convoys': 0,
'successful_supports': 0,
'total_successful_orders_processed': 0,
'failed_convoys': 0,
'failed_supports': 0,
'total_failed_order_sets_processed': 0
})
all_success_values = set()
json_files_processed = 0
if not os.path.isdir(json_directory):
logging.error(f"Error: Directory not found: {json_directory}")
# Return empty/default stats if directory not found
return {
"total_rows": 0, "total_characters": 0, "response_type_stats": defaultdict(lambda: {'prompt_chars': [], 'response_chars': []}),
"model_order_stats": defaultdict(lambda: {'successful_convoys': 0, 'successful_supports': 0, 'total_successful_orders_processed': 0, 'failed_convoys': 0, 'failed_supports': 0, 'total_failed_order_sets_processed': 0}),
"all_success_values": set(), "json_files_processed": 0, "error": f"Directory not found: {json_directory}"
}
for filename in os.listdir(json_directory):
if filename.endswith("_rl.json"):
file_path = os.path.join(json_directory, filename)
logging.info(f"Processing file: {file_path}...")
try:
file_size = os.path.getsize(file_path)
total_characters += file_size
json_files_processed += 1
with open(file_path, 'r') as f:
data = json.load(f)
if not isinstance(data, list):
logging.warning(f" Warning: Expected a list of objects in {filename}, got {type(data)}. Skipping file.")
continue
total_rows += len(data)
for entry in data:
response_type = entry.get('response_type', "UNKNOWN_RESPONSE_TYPE")
prompt_content = entry.get('prompt')
llm_response_content = entry.get('llm_response')
if response_type == "order_generation":
model = entry.get('model', 'UNKNOWN_MODEL')
success = entry.get('success')
is_successful_order_set = False
if success is True or (isinstance(success, str) and success.lower() == 'success'):
is_successful_order_set = True
all_success_values.add(entry.get('success')) # Collect all success values
if is_successful_order_set and llm_response_content is not None:
model_order_stats[model]['total_successful_orders_processed'] += 1
orders_list = extract_orders_from_llm_response(llm_response_content, model)
if orders_list:
for order_str in orders_list:
parts = order_str.upper().split()
if len(parts) == 7 and parts[0] in ['A', 'F'] and parts[2] == 'C' and parts[3] in ['A', 'F'] and parts[5] == '-':
model_order_stats[model]['successful_convoys'] += 1
elif len(parts) >= 4 and parts[2] == 'S' and parts[0] in ['A', 'F'] and parts[3] in ['A', 'F']:
model_order_stats[model]['successful_supports'] += 1
elif not is_successful_order_set:
model_order_stats[model]['total_failed_order_sets_processed'] += 1
potential_failed_orders = []
failure_reason_str = str(entry.get('success', ''))
failure_prefix = "Failure: Invalid LLM Moves (1): "
if isinstance(failure_reason_str, str) and failure_reason_str.startswith(failure_prefix):
failed_order_from_success = failure_reason_str[len(failure_prefix):].strip()
if failed_order_from_success:
potential_failed_orders.append(failed_order_from_success)
if not potential_failed_orders and llm_response_content is not None:
extracted_llm_orders = extract_orders_from_llm_response(llm_response_content, model)
if extracted_llm_orders:
potential_failed_orders.extend(extracted_llm_orders)
if potential_failed_orders:
for order_str in potential_failed_orders:
parts = order_str.upper().split()
if len(parts) == 7 and parts[0] in ['A', 'F'] and parts[2] == 'C' and parts[3] in ['A', 'F'] and parts[5] == '-':
model_order_stats[model]['failed_convoys'] += 1
elif len(parts) >= 4 and parts[2] == 'S' and parts[0] in ['A', 'F'] and parts[3] in ['A', 'F']:
model_order_stats[model]['failed_supports'] += 1
if prompt_content is not None and isinstance(prompt_content, str):
response_type_stats[response_type]['prompt_chars'].append(len(prompt_content))
if llm_response_content is not None:
if isinstance(llm_response_content, str):
response_type_stats[response_type]['response_chars'].append(len(llm_response_content))
else:
try:
response_str = json.dumps(llm_response_content)
response_type_stats[response_type]['response_chars'].append(len(response_str))
except TypeError:
logging.warning(f" Warning: Could not serialize llm_response in {filename}.")
except json.JSONDecodeError:
logging.warning(f" Warning: Could not decode JSON from {filename}. Skipping file.")
except Exception as e:
logging.warning(f" Warning: An error occurred processing {filename}: {e}. Skipping file.")
if json_files_processed == 0 and not os.path.isdir(json_directory): # Check if error was already set
pass # Error already logged and will be in return
elif json_files_processed == 0:
logging.warning(f"No '*_rl.json' files found in {json_directory}.")
return {
"total_rows": 0, "total_characters": 0, "response_type_stats": defaultdict(lambda: {'prompt_chars': [], 'response_chars': []}),
"model_order_stats": defaultdict(lambda: {'successful_convoys': 0, 'successful_supports': 0, 'total_successful_orders_processed': 0, 'failed_convoys': 0, 'failed_supports': 0, 'total_failed_order_sets_processed': 0}),
"all_success_values": set(), "json_files_processed": 0, "error": f"No '*_rl.json' files found in {json_directory}"
}
return {
"total_rows": total_rows,
"total_characters": total_characters,
"response_type_stats": response_type_stats,
"model_order_stats": model_order_stats,
"all_success_values": all_success_values,
"json_files_processed": json_files_processed,
"error": None
}
def _write_summary_output(output_file_path, analysis_data, is_debug_output):
# Setup file-specific logger for this output operation
file_logger = logging.getLogger(f"writer_{os.path.basename(output_file_path)}")
# Prevent propagation to avoid duplicate console logs if root logger has StreamHandler
file_logger.propagate = False
file_logger.setLevel(logging.DEBUG)
# Clear existing handlers for this specific logger to avoid duplication if called multiple times for same file (though unlikely with current design)
if file_logger.hasHandlers():
file_logger.handlers.clear()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh = logging.FileHandler(output_file_path, mode='w')
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
file_logger.addHandler(fh)
file_logger.info(f"Writing summary to: {output_file_path}. Debug mode: {is_debug_output}")
outfile = None
try:
outfile = open(output_file_path, 'w')
file_logger.debug(f"Output file '{output_file_path}' opened successfully for writing.")
outfile.write(f"Analysis Summary - {'Debug' if is_debug_output else 'Standard'}\n")
outfile.write(f"Generated on: {pd.Timestamp.now()}\n")
if analysis_data.get("error"):
outfile.write(f"Analysis Error: {analysis_data['error']}\n")
file_logger.error(f"Analysis error reported: {analysis_data['error']}")
return # Stop writing if there was a critical analysis error
outfile.write("\n--- Overall Statistics ---\n")
outfile.write(f"Total JSON files processed: {analysis_data['json_files_processed']}\n")
outfile.write(f"Total JSON objects (rows) generated: {analysis_data['total_rows']}\n")
outfile.write(f"Total characters of JSON generated (sum of file sizes): {analysis_data['total_characters']:,}\n")
outfile.write("\n--- Average Lengths by Response Type (in characters) ---\n")
response_type_stats = analysis_data['response_type_stats']
outfile.write(f"Found {len(response_type_stats)} unique response_type categories.\n")
avg_data = []
for rt, stats_item in response_type_stats.items():
avg_prompt_len = sum(stats_item['prompt_chars']) / len(stats_item['prompt_chars']) if stats_item['prompt_chars'] else 0
avg_response_len = sum(stats_item['response_chars']) / len(stats_item['response_chars']) if stats_item['response_chars'] else 0
count = max(len(stats_item['prompt_chars']), len(stats_item['response_chars']))
avg_data.append({
'Response Type': rt, 'Count': count,
'Avg Prompt Length': f"{avg_prompt_len:.2f}",
'Avg LLM Response Length': f"{avg_response_len:.2f}"
})
if avg_data:
outfile.write(pd.DataFrame(avg_data).to_string(index=False) + "\n")
else:
outfile.write("No data available for response type analysis.\n")
outfile.write("\n--- Convoy/Support Orders by Model ---\n") # Renamed for clarity
model_order_stats = analysis_data['model_order_stats']
order_stats_data = []
sorted_models = sorted(model_order_stats.keys())
for model_key in sorted_models:
counts = model_order_stats[model_key]
order_stats_data.append({
'Model': model_key,
'Successful Convoys': counts['successful_convoys'],
'Successful Supports': counts['successful_supports'],
'Total Successful Sets': counts['total_successful_orders_processed'],
'Failed Convoys': counts['failed_convoys'],
'Failed Supports': counts['failed_supports'],
'Total Failed Sets': counts['total_failed_order_sets_processed']
})
if order_stats_data:
outfile.write(pd.DataFrame(order_stats_data).to_string(index=False) + "\n")
else:
outfile.write("No convoy or support order data found.\n")
if is_debug_output:
all_success_values = analysis_data['all_success_values']
outfile.write("\n--- Unique Values in 'success' field (for order_generation) ---\n")
sorted_success_values = sorted(list(all_success_values), key=lambda x: str(x) if x is not None else '')
for val in sorted_success_values:
outfile.write(f"{val} (Type: {type(val).__name__})\n")
non_successful_values = set()
for val_iter in all_success_values:
is_val_successful = False
if val_iter is True or (isinstance(val_iter, str) and val_iter.lower() in ['success', 'true']):
is_val_successful = True
if not is_val_successful:
non_successful_values.add(val_iter)
outfile.write("\n--- Identified Non-Successful 'success' values (for order_generation) ---\n")
if non_successful_values:
sorted_non_successful = sorted(list(non_successful_values), key=lambda x: str(x) if x is not None else '')
for val_ns in sorted_non_successful:
outfile.write(f"{val_ns} (Type: {type(val_ns).__name__})\n")
else:
outfile.write("No specific non-successful values identified beyond False/None or unhandled strings.\n")
outfile.write("\nAnalysis script finished successfully.\n")
file_logger.info(f"Successfully wrote summary to {output_file_path}")
print(f"Analysis complete. Summary saved to: {output_file_path}")
except Exception as e:
file_logger.error(f"Error writing summary to {output_file_path}: {e}", exc_info=True)
print(f"FATAL ERROR writing to {output_file_path}: {e}")
if outfile and not outfile.closed:
try:
outfile.write(f"\nFATAL ERROR during summary writing: {e}\nTraceback:\n{traceback.format_exc()}\n")
except Exception as write_err:
file_logger.error(f"Error writing FATAL ERROR message to {output_file_path}: {write_err}")
finally:
if outfile and not outfile.closed:
file_logger.debug(f"Closing output file {output_file_path} in 'finally' block.")
outfile.close()
# Important: Remove the file handler for this specific file to avoid issues on next call
if fh in file_logger.handlers:
file_logger.removeHandler(fh)
fh.close()
def generate_analysis_reports(json_directory):
# Configure root logger for console output (INFO and higher)
# This setup is done once here.
root_logger = logging.getLogger() # Get the root logger
root_logger.setLevel(logging.DEBUG) # Set root to DEBUG to allow handlers to control their own levels
# Clear any existing handlers on the root logger to avoid duplication if script is re-run in same session
if root_logger.hasHandlers():
# Be careful clearing all handlers if other libraries also use logging.
# For a standalone script, this is usually fine.
# Alternatively, manage handlers more selectively or use a dedicated logger for this app.
for handler in root_logger.handlers[:]: # Iterate over a copy
root_logger.removeHandler(handler)
handler.close()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.INFO)
sh.setFormatter(formatter)
root_logger.addHandler(sh)
logging.info(f"Starting analysis report generation for directory: {json_directory}")
try:
analysis_data = _perform_analysis(json_directory)
standard_output_path = os.path.abspath('analysis_summary.txt')
debug_output_path = os.path.abspath('analysis_summary_debug.txt')
logging.info(f"Proceeding to write standard summary to {standard_output_path}")
_write_summary_output(standard_output_path, analysis_data, is_debug_output=False)
logging.info(f"Proceeding to write debug summary to {debug_output_path}")
_write_summary_output(debug_output_path, analysis_data, is_debug_output=True)
logging.info("All analysis reports generated successfully.")
except Exception as e:
logging.critical(f"A critical error occurred during analysis report generation: {e}", exc_info=True)
print(f"CRITICAL SCRIPT ERROR: {e}")
# Attempt to write to a fallback error log for the main orchestrator
try:
with open('analyze_rl_json_CRITICAL_ERROR.log', 'a') as err_log: # Append mode for critical errors
err_log.write(f"Timestamp: {pd.Timestamp.now()}\n")
err_log.write(f"Failed during generate_analysis_reports for directory: {json_directory}.\nError: {e}\n")
err_log.write(f"Traceback:\n{traceback.format_exc()}\n---\n")
except Exception as e_fallback:
print(f"CRITICAL FALLBACK LOGGING FAILED: {e_fallback}")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Analyze generated RL JSON files and produce standard and debug summaries.')
parser.add_argument('json_dir', type=str, help='Directory containing the *_rl.json files to analyze.')
# --output_file argument is removed
args = parser.parse_args()
abs_json_dir = os.path.abspath(args.json_dir)
generate_analysis_reports(abs_json_dir)