add simplified prompts

This commit is contained in:
sam-paech 2025-06-27 14:42:05 +10:00
parent 0bd909b30b
commit ebf26cf8a6
33 changed files with 1762 additions and 143 deletions

View file

@ -5,6 +5,7 @@ from typing import Dict, List, Callable, Optional, Any, Set, Tuple
from diplomacy.engine.map import Map as GameMap
from diplomacy.engine.game import Game as BoardState
import logging
import re
# Placeholder for actual map type from diplomacy.engine.map.Map
# GameMap = Any
@ -15,78 +16,61 @@ logger = logging.getLogger(__name__)
def build_diplomacy_graph(game_map: GameMap) -> Dict[str, Dict[str, List[str]]]:
"""
Builds a graph where keys are SHORT province names (e.g., 'PAR', 'STP').
Adjacency lists also contain SHORT province names.
This graph is used for BFS pathfinding.
Return graph[PROV]['ARMY'|'FLEET'] = list of 3-letter neighbour provinces.
Works for dual-coast provinces by interrogating `abuts()` directly instead
of relying on loc_abut.
"""
graph: Dict[str, Dict[str, List[str]]] = {}
# Deriving a clean list of unique, 3-letter, uppercase short province names
# game_map.locs contains all locations, including coasts e.g. "STP/SC"
unique_short_names = set()
for loc in game_map.locs:
short_name = loc.split('/')[0][:3].upper() # Take first 3 chars and uppercase
if len(short_name) == 3: # Ensure it's a 3-letter name
unique_short_names.add(short_name)
all_short_province_names = sorted(list(unique_short_names))
# ── collect all 3-letter province codes ───────────────────────────────
provs: Set[str] = {
loc.split("/")[0][:3].upper() # 'BUL/EC' -> 'BUL'
for loc in game_map.locs
if len(loc.split("/")[0]) == 3
}
# Initialize graph with all valid short province names as keys
for province_name in all_short_province_names:
graph[province_name] = {'ARMY': [], 'FLEET': []}
graph: Dict[str, Dict[str, List[str]]] = {
p: {"ARMY": [], "FLEET": []} for p in provs
}
for province_short_source in all_short_province_names: # e.g. 'PAR', 'STP'
# Get all full names for this source province (e.g. 'STP' -> ['STP/NC', 'STP/SC', 'STP'])
full_names_for_source = game_map.loc_coasts.get(province_short_source, [province_short_source])
# ── helper: list every concrete variant of a province ─────────────────
def variants(code: str) -> List[str]:
lst = list(game_map.loc_coasts.get(code, []))
if code not in lst:
lst.append(code) # ensure base node included
return lst
for loc_full_source_variant in full_names_for_source: # e.g. 'STP/NC', then 'STP/SC', then 'STP'
# province_short_source is already the short name like 'STP'
# game_map.loc_abut provides general adjacencies, which might include specific coasts or lowercase names
for raw_adj_loc_from_loc_abut in game_map.loc_abut.get(province_short_source, []):
# Normalize this raw adjacent location to its short, uppercase form
adj_short_name_normalized = raw_adj_loc_from_loc_abut[:3].upper()
# ── populate adjacency by brute-force queries to `abuts()` ────────────
for src in provs:
src_vers = variants(src)
# Get all full names for this *normalized* adjacent short name (e.g. 'BUL' -> ['BUL/EC', 'BUL/SC', 'BUL'])
full_names_for_adj_dest = game_map.loc_coasts.get(adj_short_name_normalized, [adj_short_name_normalized])
for dest in provs:
if dest == src:
continue
dest_vers = variants(dest)
# Check for ARMY movement
unit_char_army = 'A'
if any(
game_map.abuts(
unit_char_army,
loc_full_source_variant, # Specific full source, e.g. 'STP/NC'
'-', # Order type for move
full_dest_variant # Specific full destination, e.g. 'MOS' or 'FIN'
)
for full_dest_variant in full_names_for_adj_dest
):
if adj_short_name_normalized not in graph[province_short_source]['ARMY']:
graph[province_short_source]['ARMY'].append(adj_short_name_normalized)
# ARMYonly bases count as the origin (armies cant sit on /EC)
if any(
game_map.abuts("A", src, "-", dv) # src is the base node
for dv in dest_vers
):
graph[src]["ARMY"].append(dest)
# FLEETany src variant that can host a fleet is valid
if any(
game_map.abuts("F", sv, "-", dv)
for sv in src_vers
for dv in dest_vers
):
graph[src]["FLEET"].append(dest)
# ── tidy up duplicates / order ---------------------------------------
for p in graph:
graph[p]["ARMY"] = sorted(set(graph[p]["ARMY"]))
graph[p]["FLEET"] = sorted(set(graph[p]["FLEET"]))
# Check for FLEET movement
unit_char_fleet = 'F'
if any(
game_map.abuts(
unit_char_fleet,
loc_full_source_variant, # Specific full source, e.g. 'STP/NC'
'-', # Order type for move
full_dest_variant # Specific full destination, e.g. 'BAR' or 'NWY'
)
for full_dest_variant in full_names_for_adj_dest
):
if adj_short_name_normalized not in graph[province_short_source]['FLEET']:
graph[province_short_source]['FLEET'].append(adj_short_name_normalized)
# Remove duplicates from adjacency lists (just in case)
for province_short in graph:
if 'ARMY' in graph[province_short]:
graph[province_short]['ARMY'] = sorted(list(set(graph[province_short]['ARMY'])))
if 'FLEET' in graph[province_short]:
graph[province_short]['FLEET'] = sorted(list(set(graph[province_short]['FLEET'])))
return graph
def bfs_shortest_path(
graph: Dict[str, Dict[str, List[str]]],
board_state: BoardState,
@ -241,33 +225,56 @@ def get_nearest_enemy_units(
def get_nearest_uncontrolled_scs(
game_map: GameMap,
board_state: BoardState,
graph: Dict[str, Dict[str, List[str]]],
power_name: str,
start_unit_loc_full: str,
start_unit_type: str,
n: int = 3
) -> List[Tuple[str, int, List[str]]]: # (sc_name_short, distance, path_short_names)
"""Finds up to N nearest SCs not controlled by power_name, sorted by path length."""
uncontrolled_sc_paths: List[Tuple[str, int, List[str]]] = []
game_map: GameMap,
board_state: BoardState,
graph: Dict[str, Dict[str, List[str]]],
power_name: str,
start_unit_loc_full: str,
start_unit_type: str,
n: int = 3,
) -> List[Tuple[str, int, List[str]]]:
"""
Return up to N nearest supply centres not controlled by `power_name`,
excluding centres that are the units own province (distance 0) or
adjacent in one move (distance 1).
all_scs_short = game_map.scs # This is a list of short province names that are SCs
Each tuple is (sc_code + ctrl_tag, distance, path_of_short_codes).
"""
results: List[Tuple[str, int, List[str]]] = []
for sc_short in game_map.scs: # all SC province codes
controller = get_sc_controller(game_map, board_state, sc_short)
if controller == power_name:
continue # already ours
# helper for BFS target test
def is_target(loc_short: str, _state: BoardState) -> bool:
return loc_short == sc_short
path = bfs_shortest_path(
graph,
board_state,
game_map,
start_unit_loc_full,
start_unit_type,
is_target,
)
if not path:
continue # unreachable
distance = len(path) - 1 # moves needed
# skip distance 0 (same province) and 1 (adjacent)
if distance <= 1:
continue
tag = f"{sc_short} (Ctrl: {controller or 'None'})"
results.append((tag, distance, path))
# sort by distance, then SC code for tie-breaks
results.sort(key=lambda x: (x[1], x[0]))
return results[:n]
for sc_loc_short in all_scs_short:
controller = get_sc_controller(game_map, board_state, sc_loc_short)
if controller != power_name:
def is_target_sc(loc_short: str, current_board_state: BoardState) -> bool:
return loc_short == sc_loc_short
path_short_names = bfs_shortest_path(graph, board_state, game_map, start_unit_loc_full, start_unit_type, is_target_sc)
if path_short_names:
# Path includes start, so distance is len - 1
uncontrolled_sc_paths.append((f"{sc_loc_short} (Ctrl: {controller or 'None'})", len(path_short_names) -1, path_short_names))
# Sort by distance (path length - 1), then by SC name for tie-breaking
uncontrolled_sc_paths.sort(key=lambda x: (x[1], x[0]))
return uncontrolled_sc_paths[:n]
def get_adjacent_territory_details(
game_map: GameMap,
@ -362,7 +369,7 @@ def get_adjacent_territory_details(
# --- Main context generation function ---
def generate_rich_order_context(game: Any, power_name: str, possible_orders_for_power: Dict[str, List[str]]) -> str:
def generate_rich_order_context_xml(game: Any, power_name: str, possible_orders_for_power: Dict[str, List[str]]) -> str:
"""
Generates a strategic overview context string.
Details units and SCs for power_name, including possible orders and simplified adjacencies for its units.
@ -456,3 +463,405 @@ def generate_rich_order_context(game: Any, power_name: str, possible_orders_for_
final_context_lines.append("</PossibleOrdersContext>")
return "\n".join(final_context_lines)
# ---------------------------------------------------------------------------
# Regex and tiny helpers
# ---------------------------------------------------------------------------
import re
from typing import Tuple, List, Dict, Optional, Any
# ── order-syntax matchers ─────────────────────────────────────────────────
_SIMPLE_MOVE_RE = re.compile(r"^[AF] [A-Z]{3}(?:/[A-Z]{2})? - [A-Z]{3}(?:/[A-Z]{2})?$")
_HOLD_RE = re.compile(r"^[AF] [A-Z]{3}(?:/[A-Z]{2})? H$") # NEW
_RETREAT_RE = re.compile(r"^[AF] [A-Z]{3}(?:/[A-Z]{2})? R [A-Z]{3}(?:/[A-Z]{2})?$")
_ADJUST_RE = re.compile(r"^[AF] [A-Z]{3}(?:/[A-Z]{2})? [BD]$") # build / disband
def _is_hold_order(order: str) -> bool: # NEW
return bool(_HOLD_RE.match(order.strip()))
def _norm_power(name: str) -> str:
"""Trim & uppercase for reliable comparisons."""
return name.strip().upper()
def _is_simple_move(order: str) -> bool:
return bool(_SIMPLE_MOVE_RE.match(order.strip()))
def _is_retreat_order(order: str) -> bool:
return bool(_RETREAT_RE.match(order.strip()))
def _is_adjust_order(order: str) -> bool:
return bool(_ADJUST_RE.match(order.strip()))
def _split_move(order: str) -> Tuple[str, str]:
"""Return ('A BUD', 'TRI') from 'A BUD - TRI' (validated move only)."""
unit_part, dest = order.split(" - ")
return unit_part.strip(), dest.strip()
# ---------------------------------------------------------------------------
# Gather *all* friendly support orders for a given move
# ---------------------------------------------------------------------------
def _all_support_examples(
mover: str,
dest: str,
all_orders: Dict[str, List[str]],
) -> List[str]:
"""
Return *every* order of the form 'A/F XYZ S <mover> - <dest>'
issued by our other units. Order of return is input order.
"""
target = f"{mover} - {dest}"
supports: List[str] = []
for loc, orders in all_orders.items():
if mover.endswith(loc):
continue # skip the moving unit itself
for o in orders:
if " S " in o and target in o:
supports.append(o.strip())
return supports
def _all_support_hold_examples(
holder: str,
all_orders: Dict[str, List[str]],
) -> List[str]:
"""
Return every order of the form 'A/F XYZ S <holder>' that supports
<holder> to HOLD, excluding the holding unit itself.
"""
target = f" S {holder}"
supports: List[str] = []
for loc, orders in all_orders.items():
if holder.endswith(loc): # skip the holding unit
continue
for o in orders:
if o.strip().endswith(target):
supports.append(o.strip())
return supports
# ---------------------------------------------------------------------------
# Province-type resolver (handles short codes, coasts, seas)
# ---------------------------------------------------------------------------
def _province_type_display(game_map, prov_short: str) -> str:
"""
Return 'LAND', 'COAST', or 'WATER' for the 3-letter province code.
Falls back to 'UNKNOWN' only if nothing matches.
"""
for full in game_map.loc_coasts.get(prov_short, [prov_short]):
t = game_map.loc_type.get(full)
if not t:
continue
t = t.upper()
if t in ("LAND", "L"):
return "LAND"
if t in ("COAST", "C"):
return "COAST"
if t in ("WATER", "SEA", "W"):
return "WATER"
return "UNKNOWN"
def _dest_occupancy_desc(
dest_short: str,
game_map,
board_state,
our_power: str,
) -> str:
""" '(occupied by X)', '(occupied by X — you!)', or '(unoccupied)' """
occupant: Optional[str] = None
for full in game_map.loc_coasts.get(dest_short, [dest_short]):
u = get_unit_at_location(board_state, full)
if u:
occupant = u.split(" ")[-1].strip("()")
break
if occupant is None:
return "(unoccupied)"
if occupant == our_power:
return f"(occupied by {occupant} — you!)"
return f"(occupied by {occupant})"
# ---------------------------------------------------------------------------
# Adjacent-territory lines (used by movement-phase builder)
# ---------------------------------------------------------------------------
def _adjacent_territory_lines(
graph,
game_map,
board_state,
unit_loc_full: str,
mover_descr: str,
our_power: str,
) -> List[str]:
lines: List[str] = []
indent1 = " "
indent2 = " "
unit_loc_short = game_map.loc_name.get(unit_loc_full, unit_loc_full)[:3]
mover_type_key = "ARMY" if mover_descr.startswith("A") else "FLEET"
adjacents = graph.get(unit_loc_short, {}).get(mover_type_key, [])
for adj in adjacents:
typ_display = _province_type_display(game_map, adj)
base_parts = [f"{indent1}{adj} ({typ_display})"]
sc_ctrl = get_sc_controller(game_map, board_state, adj)
if sc_ctrl:
base_parts.append(f"SC Control: {sc_ctrl}")
unit_here = None
for full in game_map.loc_coasts.get(adj, [adj]):
unit_here = get_unit_at_location(board_state, full)
if unit_here:
break
if unit_here:
base_parts.append(f"Units: {unit_here}")
lines.append(" ".join(base_parts))
# second analytical line if occupied
if unit_here:
pwr = unit_here.split(" ")[-1].strip("()")
if pwr == our_power:
friend_descr = unit_here.split(" (")[0]
lines.append(
f"{indent2}Support hold: {mover_descr} S {friend_descr}"
)
else:
lines.append(
f"{indent2}-> {unit_here} can support or contest {mover_descr}s moves and vice-versa"
)
return lines
# ---------------------------------------------------------------------------
# Movement-phase generator (UNCHANGED LOGIC)
# ---------------------------------------------------------------------------
def _generate_rich_order_context_movement(
game: Any,
power_name: str,
possible_orders_for_power: Dict[str, List[str]],
) -> str:
"""
Produce the <Territory > blocks *exactly* as before for movement phases.
"""
board_state = game.get_state()
game_map = game.map
graph = build_diplomacy_graph(game_map)
blocks: List[str] = []
me = _norm_power(power_name)
for unit_loc_full, orders in possible_orders_for_power.items():
unit_full_str = get_unit_at_location(board_state, unit_loc_full)
if not unit_full_str:
continue
unit_power = unit_full_str.split(" ")[-1].strip("()")
if _norm_power(unit_power) != me:
continue # Skip units that arent ours
mover_descr, _ = _split_move(
f"{unit_full_str.split(' ')[0]} {unit_loc_full} - {unit_loc_full}"
)
prov_short = game_map.loc_name.get(unit_loc_full, unit_loc_full)[:3]
prov_type_disp = _province_type_display(game_map, prov_short)
sc_tag = " (SC)" if prov_short in game_map.scs else ""
owner = get_sc_controller(game_map, board_state, unit_loc_full) or "None"
owner_line = (
f"Held by {owner} (You)" if owner == power_name else f"Held by {owner}"
)
ind = " "
block: List[str] = [f"<Territory {prov_short}>"]
block.append(f"{ind}({prov_type_disp}){sc_tag}")
block.append(f"{ind}{owner_line}")
block.append(f"{ind}Units present: {unit_full_str}")
# ----- Adjacent territories -----
block.append("# Adjacent territories:")
block.extend(
_adjacent_territory_lines(
graph, game_map, board_state,
unit_loc_full, mover_descr, power_name
)
)
# ----- Nearest enemy units -----
block.append("# Nearest units (not ours):")
enemies = get_nearest_enemy_units(
board_state, graph, game_map,
power_name, unit_loc_full,
"ARMY" if mover_descr.startswith("A") else "FLEET",
n=3,
)
for u, path in enemies:
path_disp = "".join([unit_loc_full] + path[1:])
block.append(f"{ind}{u}, path [{path_disp}]")
# ----- Nearest uncontrolled SCs -----
block.append("# Nearest supply centers (not controlled by us):")
scs = get_nearest_uncontrolled_scs(
game_map, board_state, graph,
power_name, unit_loc_full,
"ARMY" if mover_descr.startswith("A") else "FLEET",
n=3,
)
for sc_str, dist, sc_path in scs:
path_disp = "".join([unit_loc_full] + sc_path[1:])
sc_fmt = sc_str.replace("Ctrl:", "Controlled by")
block.append(f"{ind}{sc_fmt}, path [{path_disp}]")
# ----- Possible moves -----
block.append(f"# Possible {mover_descr} unit movements & supports:")
simple_moves = [o for o in orders if _is_simple_move(o)]
hold_orders = [o for o in orders if _is_hold_order(o)] # NEW
if not simple_moves and not hold_orders:
block.append(f"{ind}None")
else:
# ---- Moves (same behaviour as before) ----
for mv in simple_moves:
mover, dest = _split_move(mv)
occ = _dest_occupancy_desc(
dest.split("/")[0][:3], game_map, board_state, power_name
)
block.append(f"{ind}{mv} {occ}")
for s in _all_support_examples(mover, dest, possible_orders_for_power):
block.append(f"{ind*2}Available Support: {s}")
# ---- Holds (new) ----
for hd in hold_orders:
holder = hd.split(" H")[0] # e.g., 'F DEN'
block.append(f"{ind}{hd}")
for s in _all_support_hold_examples(holder, possible_orders_for_power):
block.append(f"{ind*2}Available Support: {s}")
block.append(f"</Territory {prov_short}>")
blocks.append("\n".join(block))
return "\n\n".join(blocks)
# ---------------------------------------------------------------------------
# Retreat-phase builder echo orders verbatim, no tags
# ---------------------------------------------------------------------------
def _generate_rich_order_context_retreat(
game: Any,
power_name: str,
possible_orders_for_power: Dict[str, List[str]],
) -> str:
"""
Flatten all retreat / disband orders into one list:
A PAR R PIC
A PAR D
F NTH R HEL
If the engine supplies nothing, return the standard placeholder.
"""
lines: List[str] = []
for orders in possible_orders_for_power.values():
for o in orders:
lines.append(o.strip())
return "\n".join(lines) if lines else "(No dislodged units)"
# ---------------------------------------------------------------------------
# Adjustment-phase builder summary line + orders, no WAIVEs, no tags
# ---------------------------------------------------------------------------
def _generate_rich_order_context_adjustment(
game: Any,
power_name: str,
possible_orders_for_power: Dict[str, List[str]],
) -> str:
"""
* First line states how many builds are allowed or disbands required.
* Echo every B/D order exactly as supplied, skipping WAIVE.
* No wrapper tags.
"""
board_state = game.get_state()
sc_owned = len(board_state.get("centers", {}).get(power_name, []))
units_num = len(board_state.get("units", {}).get(power_name, []))
delta = sc_owned - units_num # +ve ⇒ builds, -ve ⇒ disbands
# ----- summary line ----------------------------------------------------
if delta > 0:
summary = f"Builds available: {delta}"
elif delta < 0:
summary = f"Disbands required: {-delta}"
else:
summary = "No builds or disbands required"
# ----- collect orders (skip WAIVE) -------------------------------------
lines: List[str] = [summary]
for orders in possible_orders_for_power.values():
for o in orders:
if "WAIVE" in o.upper():
continue
lines.append(o.strip())
# If nothing but the summary, just return the summary.
return "\n".join(lines) if len(lines) > 1 else summary
# ---------------------------------------------------------------------------
# Phase-dispatch wrapper (public entry point)
# ---------------------------------------------------------------------------
def generate_rich_order_context(
game: Any,
power_name: str,
possible_orders_for_power: Dict[str, List[str]],
) -> str:
"""
Call the correct phase-specific builder.
* Movement phase output is IDENTICAL to the previous implementation.
* Retreat and Adjustment phases use the streamlined builders introduced
earlier.
"""
phase_type = game.current_short_phase[-1]
if phase_type == "M": # Movement
return _generate_rich_order_context_movement(
game, power_name, possible_orders_for_power
)
if phase_type == "R": # Retreat
return _generate_rich_order_context_retreat(
game, power_name, possible_orders_for_power
)
if phase_type == "A": # Adjustment (build / disband)
return _generate_rich_order_context_adjustment(
game, power_name, possible_orders_for_power
)
# Fallback treat unknown formats as movement
return _generate_rich_order_context_movement(
game, power_name, possible_orders_for_power
)