WIP: Can play with bots using old frontend, and multi_bot_launcher

This is a very fragile first pass. There are no messages sent, and
sometimes the bots die without warning. But, you'll get a few good turns
in before they do and during that time, it's truly glorious.
This commit is contained in:
Tyler Marques 2025-06-23 15:56:05 -07:00
parent a05ca5d0df
commit 02a3ef8d38
No known key found for this signature in database
GPG key ID: CB99EDCF41D3016F
10 changed files with 139 additions and 730 deletions

View file

@ -463,13 +463,9 @@ class DiplomacyAgent:
f"[{self.power_name}] Using agent's own model for consolidation instead of Gemini Flash"
)
# Use the enhanced wrapper with retry logic
from .utils import run_llm_and_log
raw_response = await run_llm_and_log(
client=consolidation_client,
prompt=prompt,
log_file_path=log_file_path,
power_name=self.power_name,
phase=game.current_short_phase,
response_type="diary_consolidation",
@ -681,7 +677,6 @@ class DiplomacyAgent:
raw_response = await run_llm_and_log(
client=self.client,
prompt=full_prompt,
log_file_path=log_file_path, # Pass the main log file path
power_name=self.power_name,
phase=game.current_short_phase,
response_type="negotiation_diary_raw", # For run_llm_and_log context
@ -922,7 +917,6 @@ class DiplomacyAgent:
raw_response = await run_llm_and_log(
client=self.client,
prompt=prompt,
log_file_path=log_file_path,
power_name=self.power_name,
phase=game.current_short_phase,
response_type="order_diary",
@ -1019,6 +1013,7 @@ class DiplomacyAgent:
logger.warning(
f"[{self.power_name}] Added fallback order diary entry due to critical error."
)
raise e
# Rest of the code remains the same
async def generate_phase_result_diary_entry(
@ -1100,7 +1095,6 @@ class DiplomacyAgent:
raw_response = await run_llm_and_log(
client=self.client,
prompt=prompt,
log_file_path=log_file_path,
power_name=self.power_name,
phase=game.current_short_phase,
response_type="phase_result_diary",
@ -1154,7 +1148,7 @@ class DiplomacyAgent:
board_state: dict,
phase_summary: str,
game_history: "GameHistory",
log_file_path: str,
log_file_path: str | Path,
):
"""Analyzes the outcome of the last phase and updates goals/relationships using the LLM."""
# Use self.power_name internally
@ -1248,7 +1242,6 @@ class DiplomacyAgent:
response = await run_llm_and_log(
client=self.client,
prompt=prompt,
log_file_path=log_file_path,
power_name=power_name,
phase=current_phase,
response_type="state_update",

View file

@ -1,5 +1,6 @@
# ai_diplomacy/initialization.py
import json
from pathlib import Path
from loguru import logger
# Forward declaration for type hinting, actual imports in function if complex
@ -13,7 +14,10 @@ from .prompt_constructor import build_context_prompt
async def initialize_agent_state_ext(
agent: DiplomacyAgent, game: Game, game_history: GameHistory, log_file_path: str
agent: DiplomacyAgent,
game: Game,
game_history: GameHistory,
log_file_path: str | Path,
):
"""Uses the LLM to set initial goals and relationships for the agent."""
power_name = agent.power_name
@ -66,7 +70,6 @@ async def initialize_agent_state_ext(
response = await run_llm_and_log(
client=agent.client,
prompt=full_prompt,
log_file_path=log_file_path,
power_name=power_name,
phase=current_phase,
response_type="initialization", # Context for run_llm_and_log internal error logging

View file

@ -301,13 +301,8 @@ def log_llm_response(
"""Appends a raw LLM response to a CSV log file."""
assert log_file_path is not None
try:
# Ensure the directory exists
log_dir = os.path.dirname(log_file_path)
if log_dir: # Ensure log_dir is not empty (e.g., if path is just a filename)
os.makedirs(log_dir, exist_ok=True)
# Check if file exists to write header
file_exists = os.path.isfile(log_file_path)
file_exists = log_file_path.exists()
with open(log_file_path, "a", newline="", encoding="utf-8") as csvfile:
# Added "raw_input" to fieldnames
@ -348,7 +343,6 @@ def log_llm_response(
async def run_llm_and_log(
client: "BaseModelClient",
prompt: str,
log_file_path: str, # Kept for context, but not used for logging here
power_name: Optional[str], # Kept for context, but not used for logging here
phase: str, # Kept for context, but not used for logging here
response_type: str, # Kept for context, but not used for logging here
@ -365,4 +359,3 @@ async def run_llm_and_log(
)
# raw_response remains "" indicating failure to the caller
return raw_response

View file

@ -1,7 +1,52 @@
from os.path import exists
from pydantic_settings import BaseSettings
from pathlib import Path
import warnings
class Configuration(BaseSettings):
DEBUG: bool = False
log_file_path: Path = Path("./logs/")
log_file_path: Path = Path("./logs/logs.txt")
DEEPSEEK_API_KEY: str | None = None
OPENAI_API_KEY: str | None = None
ANTHROPIC_API_KEY: str | None = None
GEMINI_API_KEY: str | None = None
OPENROUTER_API_KEY: str | None = None
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Make the path absolute, gets rid of weirdness of calling this in different places
self.log_file_path = self.log_file_path.resolve()
self.log_file_path.parent.mkdir(parents=True, exist_ok=True)
self.log_file_path.touch(exist_ok=True)
self._validate_api_keys()
def _validate_api_keys(self):
"""Validate API keys at startup and issue warnings for missing keys"""
api_keys = [
"DEEPSEEK_API_KEY",
"OPENAI_API_KEY",
"ANTHROPIC_API_KEY",
"GEMINI_API_KEY",
"OPENROUTER_API_KEY",
]
for key in api_keys:
value = super().__getattribute__(key)
if not value or (isinstance(value, str) and len(value) == 0):
warnings.warn(f"API key '{key}' is not set or is empty", UserWarning)
def __getattribute__(self, name):
"""Override to check for empty API keys at access time"""
value = super().__getattribute__(name)
if name.endswith("_KEY") and (
not value or (isinstance(value, str) and len(value) == 0)
):
raise ValueError(
f"API key '{name}' is not set or is empty. Please configure it before use."
)
return value

View file

@ -1,6 +0,0 @@
def main():
print("Hello from bot-client!")
if __name__ == "__main__":
main()

View file

@ -214,14 +214,11 @@ class MultiBotLauncher:
active_processes.append(process)
# Read and log any output (non-blocking)
try:
while True:
line = process.stdout.readline()
if not line:
break
print(f"Bot-{process.pid}: {line.strip()}")
except:
pass # No output available
while True:
line = process.stdout.readline()
if not line:
break
logger.info(f"Bot_{process.pid}: {line.strip()}")
else:
# Process has ended
return_code = process.returncode
@ -230,14 +227,9 @@ class MultiBotLauncher:
)
# Read any remaining output
try:
remaining_output = process.stdout.read()
if remaining_output:
print(
f"Bot-{process.pid} final output: {remaining_output}"
)
except:
pass
remaining_output = process.stdout.read()
if remaining_output:
print(f"Bot-{process.pid} final output: {remaining_output}")
self.bot_processes = active_processes

View file

@ -6,34 +6,31 @@ and waits for its turn to make moves. This script is designed to be run
as a separate process for each bot in a multi-player game.
"""
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
import argparse
import asyncio
import os
import signal
from typing import Optional
from typing import Optional, Dict
import dotenv
from loguru import logger
import sys
from typed_websocket_client import (
TypedWebSocketDiplomacyClient,
from websocket_diplomacy_client import (
WebSocketDiplomacyClient,
connect_to_diplomacy_server,
)
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from diplomacy.utils.exceptions import DiplomacyException, GameIdException
# Suppress warnings
os.environ["GRPC_PYTHON_LOG_LEVEL"] = "40"
os.environ["GRPC_VERBOSITY"] = "ERROR"
os.environ["ABSL_MIN_LOG_LEVEL"] = "2"
os.environ["GRPC_POLL_STRATEGY"] = "poll"
# os.environ["GRPC_PYTHON_LOG_LEVEL"] = "40"
# os.environ["GRPC_VERBOSITY"] = "ERROR"
# os.environ["ABSL_MIN_LOG_LEVEL"] = "2"
# os.environ["GRPC_POLL_STRATEGY"] = "poll"
# Add parent directory to path for ai_diplomacy imports (runtime only)
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from diplomacy.engine.message import Message
@ -88,7 +85,7 @@ class SingleBotPlayer:
self.game_id = game_id
# Bot state
self.client: TypedWebSocketDiplomacyClient
self.client: WebSocketDiplomacyClient
self.agent: DiplomacyAgent
self.game_history = GameHistory()
self.running = True
@ -97,7 +94,9 @@ class SingleBotPlayer:
self.orders_submitted = False
# Track error stats
self.error_stats = {"conversation_errors": 0, "order_decoding_errors": 0}
self.error_stats: Dict[str, Dict[str, int]] = {
self.model_name: {"conversation_errors": 0, "order_decoding_errors": 0}
}
# Setup signal handlers for graceful shutdown
signal.signal(signal.SIGINT, self._signal_handler)
@ -128,7 +127,7 @@ class SingleBotPlayer:
)
else:
logger.info(f"Creating new game as {self.power_name}")
game = await self.client.create_game(
await self.client.create_game(
map_name="standard",
rules=["IGNORE_ERRORS", "POWER_CHOICE"], # Allow messages
power_name=self.power_name,
@ -144,15 +143,15 @@ class SingleBotPlayer:
# Initialize agent state
await initialize_agent_state_ext(
self.agent, self.client.game, self.game_history, "./llm_log.txt"
self.agent, self.client.game, self.game_history, config.log_file_path
)
# Setup game event callbacks
await self._setup_event_callbacks()
# Get initial game state
await self.client.synchronize()
self.current_phase = self.client.get_current_phase()
await self.client.game.synchronize()
self.current_phase = self.client.game.get_current_phase()
self.game_history.add_phase(self.current_phase)
logger.info(f"Bot initialized. Current phase: {self.current_phase}")
@ -181,14 +180,19 @@ class SingleBotPlayer:
logger.debug("Event callbacks setup complete")
async def _on_phase_update(self, game, notification):
def _on_phase_update(self, game, notification):
"""Handle game phase updates."""
logger.info(f"Phase update received: {notification.phase_data}")
# Update our game state
await self.client.synchronize()
# Schedule the async processing in the event loop
asyncio.create_task(self._handle_phase_update_async(notification))
new_phase = self.client.get_current_phase()
async def _handle_phase_update_async(self, notification):
"""Async handler for phase updates."""
# Update our game state
await self.client.game.synchronize()
new_phase = self.client.game.get_current_phase()
if new_phase != self.current_phase:
logger.info(f"New phase: {new_phase} (was: {self.current_phase})")
self.current_phase = new_phase
@ -198,12 +202,17 @@ class SingleBotPlayer:
# Check if we need to submit orders for this new phase
await self._check_if_orders_needed()
async def _on_game_processed(self, game, notification):
def _on_game_processed(self, game, notification):
"""Handle game processing (when orders are executed)."""
logger.info("Game processed - orders have been executed")
# Schedule the async processing in the event loop
asyncio.create_task(self._handle_game_processed_async())
async def _handle_game_processed_async(self):
"""Async handler for game processing."""
# Synchronize to get the results
await self.client.synchronize()
await self.client.game.synchronize()
# Analyze the results
await self._analyze_phase_results()
@ -220,17 +229,17 @@ class SingleBotPlayer:
# Add message to game history
self.game_history.add_message(
phase=message.phase,
phase_name=message.phase,
sender=message.sender,
recipient=message.recipient,
content=message.message,
message_content=message.message,
)
# If it's a private message to us, consider responding
if message.recipient == self.power_name and message.sender != self.power_name:
await self._consider_message_response(message)
async def _on_status_update(self, game, notification):
def _on_status_update(self, game, notification):
"""Handle game status changes."""
logger.info(f"Game status updated: {notification.status}")
@ -238,7 +247,7 @@ class SingleBotPlayer:
logger.info("Game has ended")
self.running = False
async def _on_powers_update(self, game, notification):
def _on_powers_update(self, game, notification):
"""Handle power controller updates (players joining/leaving)."""
logger.info("Powers controllers updated")
# Could implement logic to react to new players joining
@ -249,25 +258,22 @@ class SingleBotPlayer:
return
# Check if it's a phase where we can submit orders
current_short_phase = self.client.get_current_short_phase()
current_short_phase = self.client.game.current_short_phase
# We submit orders in Movement and Retreat phases
if current_short_phase.endswith("M") or current_short_phase.endswith("R"):
# Check if we have units that can receive orders
try:
orderable_locations = self.client.get_orderable_locations(
self.power_name
orderable_locations = self.client.game.get_orderable_locations(
self.power_name
)
if orderable_locations:
logger.info(f"Orders needed for phase {current_short_phase}")
self.waiting_for_orders = True
await self._submit_orders()
else:
logger.info(
f"No orderable locations for {self.power_name} in {current_short_phase}"
)
if orderable_locations:
logger.info(f"Orders needed for phase {current_short_phase}")
self.waiting_for_orders = True
await self._submit_orders()
else:
logger.info(
f"No orderable locations for {self.power_name} in {current_short_phase}"
)
except Exception as e:
logger.error(f"Error checking orderable locations: {e}")
async def _submit_orders(self):
"""Generate and submit orders for the current phase."""
@ -279,7 +285,7 @@ class SingleBotPlayer:
logger.info("Generating orders...")
# Get current board state
board_state = self.client.get_state()
board_state = self.client.game.get_state()
# Get possible orders
possible_orders = gather_possible_orders(self.client.game, self.power_name)
@ -302,7 +308,7 @@ class SingleBotPlayer:
agent_goals=self.agent.goals,
agent_relationships=self.agent.relationships,
agent_private_diary_str=self.agent.format_private_diary_for_prompt(),
phase=self.current_phase,
phase=self.client.game.get_current_phase(),
)
# Submit orders
@ -323,9 +329,13 @@ class SingleBotPlayer:
self.orders_submitted = True
self.waiting_for_orders = False
logger.info("Orders submitted successfully")
# Call the no wait so we don't sit around for the turns to end.
self.client.game.no_wait()
except DiplomacyException as e:
logger.error(f"Error submitting orders: {e}", exc_info=True)
# FIXME: I don't think we want to do this. Likely want to retry again multiple times.
#
# Submit empty orders as fallback
try:
await self.client.set_orders(self.power_name, [])
@ -339,7 +349,7 @@ class SingleBotPlayer:
logger.info("Analyzing phase results...")
# Get current board state after processing
board_state = self.client.get_state()
board_state = self.client.game.get_state()
# Generate a simple phase summary
phase_summary = f"Phase {self.current_phase} completed."
@ -350,7 +360,7 @@ class SingleBotPlayer:
board_state=board_state,
phase_summary=phase_summary,
game_history=self.game_history,
log_file_path=None,
log_file_path=config.log_file_path,
)
logger.info("Phase analysis complete")
@ -366,7 +376,7 @@ class SingleBotPlayer:
word in message.message.lower() for word in ["hello", "hi", "greetings"]
):
response = f"Hello {message.sender}! Good to hear from you."
await self.client.send_message(
await self.client.game.send_game_message(
sender=self.power_name, recipient=message.sender, message=response
)
logger.info(f"Sent response to {message.sender}: {response}")
@ -382,22 +392,17 @@ class SingleBotPlayer:
logger.info(f"Bot {self.username} ({self.power_name}) is now running...")
# Main event loop
while self.running and not self.client.is_game_done:
try:
# Synchronize with server periodically
await self.client.synchronize()
while self.running and not self.client.game.is_game_done:
# Synchronize with server periodically
await self.client.game.synchronize()
# Check if we need to submit orders
await self._check_if_orders_needed()
# Check if we need to submit orders
await self._check_if_orders_needed()
# Sleep for a bit before next iteration
await asyncio.sleep(5)
# Sleep for a bit before next iteration
await asyncio.sleep(5)
except Exception as e:
logger.error(f"Error in main loop: {e}", exc_info=True)
await asyncio.sleep(10) # Wait longer on error
if self.client.is_game_done:
if self.client.game.is_game_done:
logger.info("Game has finished")
else:
logger.info("Bot shutting down")
@ -411,8 +416,6 @@ class SingleBotPlayer:
async def cleanup(self):
"""Clean up resources."""
try:
if self.client and self.client.game:
await self.client.game.leave()
if self.client:
await self.client.close()
logger.info("Cleanup complete")
@ -426,7 +429,7 @@ def parse_arguments():
parser.add_argument("--hostname", default="localhost", help="Server hostname")
parser.add_argument("--port", type=int, default=8432, help="Server port")
parser.add_argument("--username", default="bot_player", help="Bot username")
parser.add_argument("--username", help="Bot username")
parser.add_argument("--password", default="password", help="Bot password")
parser.add_argument("--power", default="FRANCE", help="Power to control")
parser.add_argument("--model", default="gpt-3.5-turbo", help="AI model to use")
@ -441,6 +444,8 @@ def parse_arguments():
async def main():
"""Main entry point."""
args = parse_arguments()
if not args.username:
args.username = f"bot_{args.power}"
bot = SingleBotPlayer(
hostname=args.hostname,

View file

@ -1,418 +0,0 @@
"""
Typed WebSocket Diplomacy Client
This demonstrates how to create a fully typed WebSocket client using the pydantic models
from models.py, providing type safety and protocol compliance.
This is a demonstration/reference implementation showing how to properly use the
typed messages with raw WebSocket connections.
"""
import os
import asyncio
import json
import uuid
import websockets
from typing import Dict, Any, Optional, Union, List
from loguru import logger
# Add parent directory to path for ai_diplomacy imports (runtime only)
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from diplomacy.client.connection import connect
from diplomacy.client.network_game import NetworkGame
from diplomacy.engine.message import Message
from models import (
# Request messages
SignInRequest,
CreateGameRequest,
JoinGameRequest,
ListGamesRequest,
SetOrdersRequest,
ProcessGameRequest,
GetAllPossibleOrdersRequest,
# Response messages
DataTokenResponse,
DataGameResponse,
DataGamesResponse,
OkResponse,
ErrorResponse,
# Utility functions
parse_message,
serialize_message,
WebSocketMessage,
ResponseMessage,
NotificationMessage,
)
class TypedWebSocketDiplomacyClient:
"""
A fully typed WebSocket client that uses pydantic models for all communications.
This demonstrates the proper way to implement the WebSocket protocol
as documented in WEBSOCKET.md using type-safe message models.
"""
def __init__(
self, hostname: str = "localhost", port: int = 8432, use_ssl: bool = False
):
"""Initialize the typed WebSocket client."""
self.hostname = hostname
self.port = port
self.use_ssl = use_ssl
self.websocket = None
self.token: Optional[str] = None
self.game_id: Optional[str] = None
self.game_role: Optional[str] = None
self._pending_requests: Dict[str, asyncio.Future] = {}
async def connect_and_authenticate(self, username: str, password: str) -> None:
"""
Connect to the server and authenticate.
Args:
username: Username for authentication
password: Password for authentication
"""
logger.info(f"Connecting to {self.hostname}:{self.port}")
self.connection = await connect(self.hostname, self.port)
logger.info(f"Authenticating as {username}")
self.channel = await self.connection.authenticate(username, password)
self.username = username
self.token = self.channel.token
logger.info("Successfully connected and authenticated")
async def _message_handler(self):
"""Handle incoming WebSocket messages."""
try:
async for message in self.websocket:
try:
data = json.loads(message)
parsed_msg = parse_message(data)
await self._handle_message(parsed_msg)
except Exception as e:
logger.error(f"Error handling message: {e}")
logger.debug(f"Raw message: {message}")
except websockets.exceptions.ConnectionClosed:
logger.info("WebSocket connection closed")
except Exception as e:
logger.error(f"Message handler error: {e}")
async def _handle_message(self, message: WebSocketMessage):
"""Process an incoming parsed message."""
if (
hasattr(message, "request_id")
and message.request_id in self._pending_requests
):
# This is a response to a request we sent
future = self._pending_requests.pop(message.request_id)
future.set_result(message)
else:
# This is a notification - handle it
await self._handle_notification(message)
async def _handle_notification(self, message: WebSocketMessage):
"""Handle server notifications."""
logger.info(f"Received notification: {message.name}")
# In a real implementation, you'd dispatch to appropriate handlers
# based on the notification type
async def _send_request(self, request: WebSocketMessage) -> ResponseMessage:
"""Send a request and wait for the response."""
if not self.websocket:
raise ConnectionError("Not connected to server")
# Create a future to wait for the response
future = asyncio.Future()
self._pending_requests[request.request_id] = future
# Send the request
message_data = serialize_message(request)
await self.websocket.send(json.dumps(message_data))
logger.debug(f"Sent request: {request.name}")
# Wait for response (with timeout)
try:
response = await asyncio.wait_for(future, timeout=30.0)
return response
except asyncio.TimeoutError:
# Clean up the pending request
self._pending_requests.pop(request.request_id, None)
raise TimeoutError(f"Request {request.name} timed out")
async def authenticate(self, username: str, password: str) -> str:
"""Authenticate with the server and return the auth token."""
request = SignInRequest(
request_id=str(uuid.uuid4()), username=username, password=password
)
response = await self._send_request(request)
if isinstance(response, ErrorResponse):
raise ValueError(f"Authentication failed: {response.message}")
elif isinstance(response, DataTokenResponse):
self.token = response.data
logger.info("Successfully authenticated")
return self.token
else:
raise ValueError(f"Unexpected response type: {type(response)}")
async def create_game(
self,
map_name: str = "standard",
rules: List[str] = None,
power_name: Optional[str] = None,
n_controls: int = 7,
deadline: Optional[int] = None,
registration_password: Optional[str] = None,
) -> Dict[str, Any]:
"""Create a new game on the server."""
if not self.token:
raise ValueError("Must authenticate first")
if rules is None:
rules = ["NO_PRESS", "IGNORE_ERRORS", "POWER_CHOICE"]
request = CreateGameRequest(
request_id=str(uuid.uuid4()),
token=self.token,
map_name=map_name,
rules=rules,
power_name=power_name,
n_controls=n_controls,
deadline=deadline,
registration_password=registration_password,
)
response = await self._send_request(request)
if isinstance(response, ErrorResponse):
raise ValueError(f"Game creation failed: {response.message}")
elif isinstance(response, DataGameResponse):
game_data = response.data
self.game_id = game_data.get("game_id")
self.game_role = power_name or "OMNISCIENT"
logger.info(f"Created game {self.game_id} as {self.game_role}")
return game_data
else:
raise ValueError(f"Unexpected response type: {type(response)}")
async def join_game(
self,
game_id: str,
power_name: Optional[str] = None,
registration_password: Optional[str] = None,
) -> Dict[str, Any]:
"""Join an existing game."""
if not self.token:
raise ValueError("Must authenticate first")
request = JoinGameRequest(
request_id=str(uuid.uuid4()),
token=self.token,
game_id=game_id,
power_name=power_name,
registration_password=registration_password,
)
response = await self._send_request(request)
if isinstance(response, ErrorResponse):
raise ValueError(f"Game join failed: {response.message}")
elif isinstance(response, DataGameResponse):
game_data = response.data
self.game_id = game_id
self.game_role = power_name or "OBSERVER"
logger.info(f"Joined game {game_id} as {self.game_role}")
return game_data
else:
raise ValueError(f"Unexpected response type: {type(response)}")
async def list_games(
self,
game_id_filter: Optional[str] = None,
map_name: Optional[str] = None,
status: Optional[str] = None,
include_protected: bool = False,
) -> List[Dict[str, Any]]:
"""List available games on the server."""
if not self.token:
raise ValueError("Must authenticate first")
request = ListGamesRequest(
request_id=str(uuid.uuid4()),
token=self.token,
game_id_filter=game_id_filter,
map_name=map_name,
status=status,
include_protected=include_protected,
)
response = await self._send_request(request)
if isinstance(response, ErrorResponse):
raise ValueError(f"List games failed: {response.message}")
elif isinstance(response, DataGamesResponse):
return response.data
else:
raise ValueError(f"Unexpected response type: {type(response)}")
async def set_orders(
self, power_name: str, orders: List[str], phase: Optional[str] = None
) -> None:
"""Submit orders for a power."""
if not self.token or not self.game_id:
raise ValueError("Must authenticate and join game first")
request = SetOrdersRequest(
request_id=str(uuid.uuid4()),
token=self.token,
game_id=self.game_id,
game_role=power_name,
phase=phase,
orders=orders,
)
response = await self._send_request(request)
if isinstance(response, ErrorResponse):
raise ValueError(f"Set orders failed: {response.message}")
elif isinstance(response, OkResponse):
logger.info(f"Orders set for {power_name}: {orders}")
else:
raise ValueError(f"Unexpected response type: {type(response)}")
async def process_game(self, phase: Optional[str] = None) -> None:
"""Process the game (admin only)."""
if not self.token or not self.game_id:
raise ValueError("Must authenticate and join game first")
request = ProcessGameRequest(
request_id=str(uuid.uuid4()),
token=self.token,
game_id=self.game_id,
game_role=self.game_role or "MASTER",
phase=phase,
)
response = await self._send_request(request)
if isinstance(response, ErrorResponse):
raise ValueError(f"Process game failed: {response.message}")
elif isinstance(response, OkResponse):
logger.info("Game processed successfully")
else:
raise ValueError(f"Unexpected response type: {type(response)}")
async def get_all_possible_orders(
self, phase: Optional[str] = None
) -> Dict[str, List[str]]:
"""Get all possible orders for the current phase."""
if not self.token or not self.game_id:
raise ValueError("Must authenticate and join game first")
request = GetAllPossibleOrdersRequest(
request_id=str(uuid.uuid4()),
token=self.token,
game_id=self.game_id,
game_role=self.game_role or "OBSERVER",
phase=phase,
)
response = await self._send_request(request)
if isinstance(response, ErrorResponse):
raise ValueError(f"Get possible orders failed: {response.message}")
elif hasattr(response, "data"):
return response.data
else:
raise ValueError(f"Unexpected response type: {type(response)}")
async def close(self):
"""Close the WebSocket connection."""
if self.websocket:
await self.websocket.close()
logger.info("WebSocket connection closed")
# Example usage function
async def example_usage():
"""Demonstrate how to use the typed WebSocket client."""
client = TypedWebSocketDiplomacyClient()
try:
# Connect to server
await client.connect()
# Authenticate
token = await client.authenticate("player1", "password")
print(f"Authenticated with token: {token[:10]}...")
# List available games
games = await client.list_games()
print(f"Found {len(games)} games")
# Create a new game
game_data = await client.create_game(
power_name="FRANCE",
n_controls=1, # For testing
)
print(f"Created game: {game_data.get('game_id')}")
# Submit some orders
await client.set_orders("FRANCE", ["A PAR H", "F BRE H", "A MAR H"])
print("Orders submitted")
# Get possible orders
possible_orders = await client.get_all_possible_orders()
print(f"Possible orders: {len(possible_orders)} locations")
# Process game (if admin)
try:
await client.process_game()
print("Game processed")
except ValueError as e:
print(f"Could not process game: {e}")
except Exception as e:
print(f"Error: {e}")
finally:
await client.close()
# Convenience function for quick setup
async def connect_to_diplomacy_server(
username: str,
password: str,
hostname: str = "localhost",
port: int = 8432,
use_ssl: bool = False,
) -> TypedWebSocketDiplomacyClient:
"""
Convenience function to quickly connect to a Diplomacy server.
Args:
hostname: Server hostname
port: Server port
username: Username for authentication
password: Password for authentication
use_ssl: Whether to use SSL/TLS
Returns:
Connected and authenticated WebSocketDiplomacyClient
"""
client = TypedWebSocketDiplomacyClient(hostname, port, use_ssl)
await client.connect_and_authenticate(username, password)
return client
if __name__ == "__main__":
# Run the example
asyncio.run(example_usage())

View file

@ -1,201 +0,0 @@
"""
Simple example demonstrating how to use the WebSocketDiplomacyClient.
This script shows basic operations like connecting, creating/joining games,
and interacting with a Diplomacy server via WebSocket.
"""
import asyncio
from loguru import logger
from websocket_diplomacy_client import connect_to_diplomacy_server
async def basic_client_example():
"""
Basic example showing how to connect and interact with a Diplomacy server.
"""
try:
# Connect to the server
logger.info("Connecting to Diplomacy server...")
client = await connect_to_diplomacy_server(
hostname="localhost",
port=8432,
username="test_player",
password="test_password",
)
logger.info("Connected successfully!")
# List available games
logger.info("Listing available games...")
games = await client.list_games()
logger.info(f"Found {len(games)} games:")
for game in games:
logger.info(
f" Game {game.get('game_id', 'unknown')}: {game.get('status', 'unknown')} "
f"({game.get('n_players', 0)}/{game.get('n_controls', 0)} players)"
)
# Get available maps
logger.info("Getting available maps...")
maps = await client.get_available_maps()
logger.info(f"Available maps: {list(maps.keys())}")
# Create a new game
logger.info("Creating a new game...")
game = await client.create_game(
map_name="standard",
rules=["NO_PRESS", "IGNORE_ERRORS", "POWER_CHOICE"],
power_name="FRANCE", # Control France
n_controls=1, # Only need 1 player to start (for testing)
deadline=None, # No time pressure
)
logger.info(f"Created game {client.game_id} as {client.power_name}")
# Get current game state
logger.info("Getting current game state...")
state = client.get_state()
logger.info(f"Current phase: {client.get_current_phase()}")
logger.info(f"France has {len(client.get_units('FRANCE'))} units")
# Get possible orders
logger.info("Getting possible orders for France...")
possible_orders = client.get_all_possible_orders()
france_orders = possible_orders.get("FRANCE", [])
logger.info(f"France can make {len(france_orders)} possible orders")
if france_orders:
logger.info(f"First few orders: {france_orders[:5]}")
# Submit some orders (example: hold all units)
logger.info("Submitting hold orders for all French units...")
units = client.get_units("FRANCE")
hold_orders = []
for unit in units:
# Format: "A PAR H" means Army in Paris holds
hold_orders.append(f"{unit} H")
if hold_orders:
await client.set_orders("FRANCE", hold_orders)
logger.info(f"Submitted orders: {hold_orders}")
# Try to process the game (might fail if we don't have admin rights)
logger.info("Attempting to process the game...")
try:
await client.process_game()
logger.info("Game processed successfully")
# Synchronize to get updated state
await client.synchronize()
logger.info(
f"After processing - Current phase: {client.get_current_phase()}"
)
except Exception as e:
logger.warning(f"Could not process game (normal if not admin): {e}")
# Leave the game
logger.info("Leaving the game...")
await client.game.leave()
logger.info("Left the game successfully")
except Exception as e:
logger.error(f"Error in example: {e}", exc_info=True)
finally:
# Clean up
if "client" in locals():
await client.close()
logger.info("Example completed")
async def join_existing_game_example(game_id: str):
"""
Example showing how to join an existing game.
Args:
game_id: ID of the game to join
"""
try:
logger.info(f"Joining existing game {game_id}...")
client = await connect_to_diplomacy_server(
hostname="localhost",
port=8432,
username="test_player_2",
password="test_password",
)
# Join as an observer first
game = await client.join_game(game_id=game_id, power_name=None)
logger.info(f"Joined game {game_id} as observer")
# Get game state
state = client.get_state()
logger.info(f"Game phase: {client.get_current_phase()}")
logger.info(f"Game status: {game.status}")
# List powers and their status
for power_name, power in client.powers.items():
logger.info(
f"{power_name}: {len(power.centers)} centers, "
f"{len(power.units)} units, eliminated: {power.is_eliminated()}"
)
except Exception as e:
logger.error(f"Error joining game: {e}", exc_info=True)
finally:
if "client" in locals():
await client.close()
async def message_sending_example():
"""
Example showing how to send diplomatic messages.
"""
try:
client = await connect_to_diplomacy_server()
# Create a game with PRESS allowed
game = await client.create_game(
rules=[
"IGNORE_ERRORS",
"POWER_CHOICE",
], # Remove NO_PRESS to allow messages
power_name="FRANCE",
n_controls=1,
)
# Send a public message
await client.send_message(
sender="FRANCE",
recipient="GLOBAL",
message="Greetings from France! Let's have a fair game.",
)
logger.info("Sent public message")
# Send a private message (would need another power to be present)
try:
await client.send_message(
sender="FRANCE",
recipient="ENGLAND",
message="Hello England, shall we discuss an alliance?",
)
logger.info("Sent private message to England")
except Exception as e:
logger.info(f"Could not send private message: {e}")
except Exception as e:
logger.error(f"Error in messaging example: {e}", exc_info=True)
finally:
if "client" in locals():
await client.close()
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
# Join existing game if game ID provided as argument
game_id = sys.argv[1]
asyncio.run(join_existing_game_example(game_id))
else:
# Run basic example
asyncio.run(basic_client_example())

View file

@ -5,15 +5,18 @@ A simplified client wrapper for connecting to a Diplomacy server via WebSocket
and playing games remotely, designed as a drop-in replacement for direct Game() usage.
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from typing import Dict, List, Optional, Any
from diplomacy.engine.game import Game
from loguru import logger
from diplomacy.client.connection import connect
from diplomacy.client.network_game import NetworkGame
from diplomacy.engine.message import Message
from diplomacy.utils.exceptions import DiplomacyException
from typed_websocket_client import TypedWebSocketDiplomacyClient
class WebSocketDiplomacyClient:
@ -432,7 +435,7 @@ async def connect_to_diplomacy_server(
hostname: str = "localhost",
port: int = 8432,
use_ssl: bool = False,
) -> TypedWebSocketDiplomacyClient:
) -> WebSocketDiplomacyClient:
"""
Convenience function to quickly connect to a Diplomacy server.
@ -446,6 +449,6 @@ async def connect_to_diplomacy_server(
Returns:
Connected and authenticated WebSocketDiplomacyClient
"""
client = TypedWebSocketDiplomacyClient(hostname, port, use_ssl)
client = WebSocketDiplomacyClient(hostname, port, use_ssl)
await client.connect_and_authenticate(username, password)
return client