diff --git a/environments/community/ethereum_virtual_machine/README.md b/environments/community/ethereum_virtual_machine/README.md
new file mode 100644
index 00000000..cee312e4
--- /dev/null
+++ b/environments/community/ethereum_virtual_machine/README.md
@@ -0,0 +1,164 @@
+# Ethereum Virtual Machine (EVM) Transaction Agent Environment
+
+Atropos environment for training language models to generate and execute profitable Ethereum transactions. An active forked version of the blockchain is created using Anvil (https://getfoundry.sh/guides/forking-mainnet-with-cast-anvil) to allow for execution and state inspection to verify transactions execute and perform the desired action.
+
+## Overview
+
+This environment trains language models to become proficient at text to transaction for EVM blockchains. The existing config allows for ETH and ERC-20 transfers to be generated in natural language dynamically by LLM calls. These are designed to target different types of transactions with increasing frequency towards those transaction types that the model is scoring poorly on. The agent learns to handle ETH transfers, ERC-20 token transfers, and complex DeFi interactions through reinforcement learning.
+
+## Features
+
+- **Complete EVM Training Environment**: Full implementation of the BaseEnv interface for Atropos
+- **Anvil Blockchain Simulation**: Local Ethereum fork for safe transaction testing
+- **Multi-Token Support**: ETH and major ERC-20 tokens (USDC, USDT, DAI, WETH, CRV)
+- **Dynamic Question Generation**: LLM-powered generation of realistic transaction requests
+- **Comprehensive Scoring System**: Multi-dimensional evaluation of transaction correctness
+- **Adaptive Learning**: Performance-based question type selection for targeted improvement
+- **Robust Cleanup**: Graceful handling of interruptions and proper resource management
+
+## Files
+
+- **evm_server.py**: Main environment implementation with transaction scoring logic
+- **anvil.py**: Anvil blockchain backend management with integrated configuration
+- **configs/token_transfers.yaml**: Blockchain simulation configuration
+- **utils.py**: Cleanup handlers and utility functions
+
+## Transaction Types
+
+The environment trains on three primary transaction categories:
+
+1. **ETH Transfer**: Simple Ether transfers between addresses
+2. **ERC-20 Transfer (18 decimals)**: Standard token transfers (DAI, WETH, CRV)
+3. **ERC-20 Transfer (non-18 decimals)**: Tokens with different decimal precision (USDC, USDT)
+
+## Verified Scoring System with Anvil
+
+Unlike traditional RL environments that rely on simulated or estimated rewards, this environment provides **cryptographically verified rewards** by executing transactions on a real Ethereum Virtual Machine simulation powered by Anvil. This ensures that scoring is based on actual blockchain state changes rather than heuristic approximations.
+
+### Anvil-Powered Verification
+
+**Anvil** (Foundry's blockchain simulator) enables true verification by:
+
+- **Real EVM Execution**: Transactions run on an actual Ethereum Virtual Machine, not a simplified simulation
+- **Mainnet Fork**: Uses real mainnet state with actual token contracts and balances
+- **Cryptographic Verification**: Transaction success/failure is determined by EVM consensus rules
+- **Atomic State Management**: Blockchain snapshots ensure clean evaluation without side effects
+- **Gas Estimation**: Real gas consumption and fee calculation for realistic training
+
+### Scoring Methodology
+
+The environment employs a **snapshot-execute-verify-revert** cycle for each transaction:
+
+```
+1. Snapshot blockchain state
+2. Record pre-execution balances
+3. Execute agent's transaction
+4. Measure actual state changes
+5. Calculate verified score
+6. Revert to clean snapshot
+```
+
+This process ensures that:
+- ✅ **No False Positives**: Only correctly executed transactions receive rewards
+- ✅ **Precise Measurement**: Exact balance changes are measured, not estimated
+- ✅ **Isolated Evaluation**: Each transaction is evaluated independently
+- ✅ **Real-World Validity**: Successful transactions would work on actual mainnet
+
+### Five-Dimensional Scoring
+
+The reward function evaluates transactions across five verified dimensions:
+
+1. **Correct Balance Changes (0.5 points)**:
+ - **Most Critical Component**: Measures actual on-chain balance differences
+ - Compares pre/post execution balances with cryptographic precision
+ - For ETH: Exact wei amounts transferred to destination
+ - For ERC-20: Exact token units transferred (accounting for decimals)
+ - Verified against real contract state, not estimated
+
+2. **Successful Execution (0.3 points)**:
+ - Verified by EVM status code (`0x1` = success)
+ - Ensures transaction doesn't revert due to insufficient funds, gas, or logic errors
+ - Only awarded if transaction is mined successfully
+
+3. **Thinking Quality (±0.1 points)**:
+4. **Destination Address Accuracy (0.05 points)**:
+5. **Data Field Correctness (0.05 points)**:
+
+**Total Score Range**: -0.2 to 1.0
+- **Perfect execution**: 1.0 (all components correct)
+- **Missing thinking**: -0.2 (penalty for unexplained decisions)
+- **Partial success**: Proportional scoring based on verified components
+
+## Prerequisites
+
+### System Requirements
+- Python 3.8+
+- [Foundry](https://book.getfoundry.sh/) (includes Anvil and Cast)
+- OpenAI API key
+
+### Installing Foundry/Anvil
+
+**Quick Install (Recommended)**
+```bash
+curl -L https://foundry.paradigm.xyz | bash
+foundryup
+```
+
+**Verify Installation:**
+```bash
+anvil --version
+cast --version
+forge --version
+```
+
+## Setup
+
+1. **Install Python dependencies:**
+ ```bash
+ pip install openai pydantic PyYAML
+ ```
+
+2. **Set OpenAI API key:**
+ ```bash
+ export OPENAI_API_KEY="your-api-key-here"
+ ```
+
+3. **Verify configuration:**
+ ```bash
+ python -c "from anvil import AnvilConfig; config = AnvilConfig(); print('Config loaded successfully')"
+ ```
+
+## Usage
+
+### Running the Environment
+
+**For inference-only rollouts:**
+```bash
+cd environments/community/ethereum_virtual_machine/
+python evm_server.py process \
+ --env.data_path_to_save_groups evm_rollouts.jsonl \
+ --openai.model_name gpt-4o-mini
+```
+
+**For full training with server:**
+```bash
+python evm_server.py serve
+```
+
+### Configuration
+
+The environment uses `configs/token_transfers.yaml` for blockchain configuration:
+
+- **Network Settings**: Port (8545), chain ID, block time
+- **Fork Configuration**: Mainnet fork at specific block
+- **Wallet Setup**: Custom wallet funding and token swaps
+- **Gas Settings**: Limit and price configuration
+- **Token Addresses**: Whitelisted ERC-20 tokens
+
+## Potential Training Applications
+
+- **DeFi Agent Development**: Training models for decentralized finance interactions
+- **Transaction Automation**: Building agents for routine blockchain operations
+- **Smart Contract Interaction**: Learning to encode function calls and parameters
+- **Risk Assessment**: Understanding transaction costs and failure modes
+- **Multi-Chain Operations**: Foundation for cross-chain transaction agents
diff --git a/environments/community/ethereum_virtual_machine/anvil.py b/environments/community/ethereum_virtual_machine/anvil.py
new file mode 100644
index 00000000..bf7f5a0d
--- /dev/null
+++ b/environments/community/ethereum_virtual_machine/anvil.py
@@ -0,0 +1,765 @@
+"""Anvil blockchain simulation backend with integrated configuration.
+
+This module provides a complete interface for managing Anvil (Foundry's local Ethereum node)
+with integrated YAML configuration loading.
+"""
+
+from __future__ import annotations
+
+import atexit
+import logging
+import signal
+import subprocess
+import time
+from pathlib import Path
+from typing import Any, Dict, List, Optional
+
+import requests
+import yaml
+
+# Set up anvil logger to write to anvil.log
+anvil_logger = logging.getLogger("anvil")
+anvil_logger.setLevel(logging.INFO)
+anvil_logger.propagate = False
+
+# Create file handler for anvil.log
+if not anvil_logger.handlers:
+ file_handler = logging.FileHandler("anvil.log")
+ file_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
+ file_handler.setFormatter(file_formatter)
+ anvil_logger.addHandler(file_handler)
+
+
+class ConfigDict:
+ """Helper class to provide dot-notation access to configuration dictionaries."""
+
+ def __init__(self, data: Dict[str, Any]):
+ for key, value in data.items():
+ if isinstance(value, dict):
+ setattr(self, key, ConfigDict(value))
+ else:
+ setattr(self, key, value)
+
+ def __getitem__(self, key):
+ return getattr(self, key)
+
+ def __contains__(self, key):
+ return hasattr(self, key)
+
+ def get(self, key, default=None):
+ return getattr(self, key, default)
+
+
+class AnvilConfig:
+ """Configuration loader for Anvil EVM environment."""
+
+ def __init__(self, config_file: str = "configs/token_transfers.yaml"):
+ self.config_file = Path(__file__).parent / config_file
+ self._raw_config = self._load_config()
+
+ # Create dot-notation accessible config sections
+ self.anvil = ConfigDict(
+ self._raw_config.get("network", {})
+ ) # Renamed from 'network' to 'anvil'
+ self.timeouts = ConfigDict(self._raw_config.get("timeouts", {}))
+ self.funding = ConfigDict(self._raw_config.get("funding", {}))
+ self.whitelisted_tokens = ConfigDict(
+ self._raw_config.get("whitelisted_tokens", {})
+ )
+ self.defi = ConfigDict(self._raw_config.get("defi", {}))
+ self.swaps = ConfigDict(self._raw_config.get("swaps", {}))
+
+ def _load_config(self) -> Dict[str, Any]:
+ """Load configuration from YAML file."""
+ try:
+ with open(self.config_file, "r") as f:
+ config = yaml.safe_load(f)
+ return config
+ except FileNotFoundError:
+ raise FileNotFoundError(f"Configuration file not found: {self.config_file}")
+ except yaml.YAMLError as e:
+ raise ValueError(f"Error parsing configuration file: {e}")
+
+ # Helper Methods
+ def get_rpc_url(self) -> str:
+ """Get the full RPC URL for the Anvil instance."""
+ return f"http://127.0.0.1:{self.anvil.port}"
+
+ def get_anvil_startup_command(
+ self, port: int = None, fork_url: str = None
+ ) -> list[str]:
+ """Get the Anvil startup command with specified or default parameters."""
+ cmd = ["anvil", "--port", str(port or self.anvil.port)]
+ if fork_url or self.anvil.fork_url:
+ cmd += ["--fork-url", fork_url or self.anvil.fork_url]
+ return cmd
+
+
+class AnvilBackend:
+ """Anvil-specific blockchain simulation backend."""
+
+ def __init__(
+ self,
+ config: AnvilConfig,
+ port: Optional[int] = None,
+ fork_url: Optional[str] = None,
+ log_file: Optional[str] = None,
+ ) -> None:
+ self.config = config
+ self.port = port or config.anvil.port
+ self.fork_url = fork_url or config.anvil.fork_url
+ self.log_file = log_file or config.anvil.log_file
+ self._proc: Optional[subprocess.Popen[str]] = None
+ self._is_wallet_setup = False
+ self.rpc_url = f"http://127.0.0.1:{self.port}"
+
+ # Register cleanup handlers
+ self._setup_cleanup_handlers()
+
+ def _setup_cleanup_handlers(self):
+ """Setup cleanup handlers for various exit scenarios"""
+ # Register cleanup function to run on normal exit
+ atexit.register(self._cleanup_process)
+
+ # Register signal handlers for graceful shutdown
+ signal.signal(signal.SIGINT, self._signal_handler) # Ctrl+C
+ signal.signal(signal.SIGTERM, self._signal_handler) # Termination signal
+
+ # On Windows, also handle SIGBREAK
+ if hasattr(signal, "SIGBREAK"):
+ signal.signal(signal.SIGBREAK, self._signal_handler)
+
+ def _signal_handler(self, signum, frame):
+ """Handle shutdown signals gracefully"""
+ anvil_logger.info(
+ f"Received signal {signum}, shutting down Anvil gracefully..."
+ )
+ self._cleanup_process()
+
+ def _cleanup_process(self):
+ """Clean up Anvil process"""
+ if self._proc and self._proc.poll() is None:
+ try:
+ anvil_logger.info("Terminating Anvil process...")
+ self._proc.terminate()
+ try:
+ self._proc.wait(timeout=5)
+ anvil_logger.info("Anvil process terminated gracefully")
+ except subprocess.TimeoutExpired:
+ anvil_logger.warning(
+ "Anvil didn't terminate gracefully, killing process"
+ )
+ self._proc.kill()
+ self._proc.wait()
+ anvil_logger.info("Anvil process killed")
+ except Exception as e:
+ anvil_logger.error(f"Error during Anvil cleanup: {e}")
+ finally:
+ self._proc = None
+
+ def start(self) -> None:
+ """Start the Anvil process."""
+ if self._proc is not None and self._proc.poll() is None:
+ anvil_logger.info("Anvil is already running")
+ return # already running
+
+ cmd = self.config.get_anvil_startup_command(self.port, self.fork_url)
+
+ # Open log file for anvil output
+ log_path = Path(self.log_file)
+ log_path.parent.mkdir(parents=True, exist_ok=True)
+
+ with open(log_path, "w") as log_f:
+ log_f.write(f"=== Anvil started at port {self.port} ===\n")
+ log_f.write(f"Command: {' '.join(cmd)}\n")
+ log_f.write("=" * 50 + "\n")
+
+ # spawn detached so we can ctrl-c main program without killing anvil
+ self._proc = subprocess.Popen(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ text=True,
+ )
+
+ # wait until RPC ready and log output
+ started = False
+ with open(log_path, "a") as log_f:
+ for i in range(self.config.timeouts.anvil_startup_lines):
+ line = self._proc.stdout.readline() # type: ignore
+ if line:
+ log_f.write(line)
+ log_f.flush() # Ensure immediate write
+ if "Listening on" in line or "JSON-RPC server started" in line:
+ started = True
+ break
+ else:
+ # No more output, break early
+ break
+ if not started:
+ anvil_logger.error("Failed to launch anvil; did you run the setup script?")
+ raise RuntimeError("Failed to launch anvil; did you run the setup script?")
+
+ def stop(self) -> None:
+ """Stop the Anvil process."""
+ self._cleanup_process()
+
+ def get_rpc_url(self) -> str:
+ """Get the RPC URL for this Anvil instance."""
+ return self.rpc_url
+
+ def execute_transaction(self, tx_obj: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ Execute transaction using cast command.
+
+ Args:
+ tx_obj: Transaction object from agent (e.g., {"to": "0x...", "value": "0.5", "data": "0x"})
+
+ Returns:
+ Dict with success, gas_used, output, tx_hash, error
+ """
+ try:
+ # Extract transaction fields
+ to_address = tx_obj.get("to", "")
+ value = tx_obj.get("value", "0")
+ data = tx_obj.get("data", "0x")
+
+ # Convert hex value to decimal for cast
+ if isinstance(value, str) and value.startswith("0x"):
+ try:
+ value_decimal = str(int(value, 16))
+ except ValueError:
+ value_decimal = "0"
+ else:
+ value_decimal = str(value)
+
+ # Build cast command - different approaches based on whether we have data
+ if data and data != "0x" and len(data) > 2:
+ # Transaction with data (contract interaction) - pass raw hex data as sig parameter
+ cmd = [
+ "cast",
+ "send",
+ to_address,
+ data, # Raw hex data as the sig parameter (selector + encoded calldata)
+ "--from",
+ self.config.funding.custom_wallet,
+ "--unlocked",
+ "--value",
+ value_decimal,
+ "--rpc-url",
+ self.get_rpc_url(),
+ ]
+ else:
+ # Simple ETH transfer
+ cmd = [
+ "cast",
+ "send",
+ to_address,
+ "--from",
+ self.config.funding.custom_wallet,
+ "--unlocked",
+ "--value",
+ value_decimal,
+ "--rpc-url",
+ self.get_rpc_url(),
+ ]
+
+ # Execute cast command
+ result = subprocess.run(
+ cmd,
+ capture_output=True,
+ text=True,
+ timeout=self.config.timeouts.cast_command,
+ )
+
+ # Parse result
+ if result.returncode == 0:
+ # Success - extract transaction hash and get receipt
+ tx_hash = result.stdout.strip()
+ gas_used = self._get_gas_used(tx_hash)
+
+ return {
+ "success": True,
+ "status": "0x1", # Success status for scoring
+ "gas_used": gas_used,
+ "tx_hash": tx_hash,
+ "output": result.stdout,
+ }
+ else:
+ # Failure - parse error
+ error_msg = result.stderr.strip() or result.stdout.strip()
+ return {
+ "success": False,
+ "status": "0x0", # Failure status for scoring
+ "gas_used": 0,
+ "error": error_msg,
+ "output": result.stderr + result.stdout,
+ }
+
+ except subprocess.TimeoutExpired:
+ return {
+ "success": False,
+ "status": "0x0",
+ "gas_used": 0,
+ "error": "Transaction timeout",
+ "output": "cast command timed out",
+ }
+ except Exception as e:
+ anvil_logger.error(f"Exception in execute_transaction: {str(e)}")
+ return {
+ "success": False,
+ "status": "0x0",
+ "gas_used": 0,
+ "error": str(e),
+ "output": f"Failed to execute cast: {str(e)}",
+ }
+
+ def setup_wallet(self, wallet_address: Optional[str] = None) -> None:
+ """Setup custom wallet by impersonating it and funding with ETH."""
+ if self._is_wallet_setup:
+ return # Already setup
+
+ wallet = wallet_address or self.config.funding.custom_wallet
+
+ try:
+ # Impersonate the custom wallet using cast command
+ result = subprocess.run(
+ [
+ "cast",
+ "rpc",
+ "anvil_impersonateAccount",
+ wallet,
+ "--rpc-url",
+ self.get_rpc_url(),
+ ],
+ capture_output=True,
+ text=True,
+ timeout=self.config.timeouts.cast_command,
+ )
+
+ if result.returncode != 0:
+ anvil_logger.error(f"Failed to impersonate wallet: {result.stderr}")
+ raise RuntimeError(f"Failed to impersonate wallet: {result.stderr}")
+
+ # Add buffer time
+ time.sleep(self.config.timeouts.wallet_setup_buffer)
+
+ # Fund the custom wallet with ETH from Anvil account 0
+ result = subprocess.run(
+ [
+ "cast",
+ "send",
+ wallet,
+ "--private-key",
+ self.config.funding.anvil_private_key_0,
+ "--value",
+ self.config.funding.initial_funding_amount,
+ "--rpc-url",
+ self.get_rpc_url(),
+ ],
+ capture_output=True,
+ text=True,
+ timeout=self.config.timeouts.cast_command,
+ )
+
+ if result.returncode != 0:
+ anvil_logger.error(f"Failed to fund custom wallet: {result.stderr}")
+ raise RuntimeError(f"Failed to fund custom wallet: {result.stderr}")
+
+ # Add buffer time before starting swaps
+ time.sleep(self.config.timeouts.wallet_setup_buffer)
+
+ # Perform initial token swaps to diversify the wallet
+ self._perform_initial_swaps()
+
+ self._is_wallet_setup = True
+
+ except Exception as e:
+ anvil_logger.error(f"Error setting up custom wallet: {str(e)}")
+ raise
+
+ def snapshot(self) -> str:
+ """Take a snapshot of the current blockchain state."""
+ return self._rpc("evm_snapshot")
+
+ def revert(self, snap_id: str) -> None:
+ """Revert to a previous snapshot."""
+ self._rpc("evm_revert", [snap_id])
+
+ # Private helper methods
+ def _rpc(self, method: str, params: Optional[List[Any]] = None) -> Any:
+ """Make an RPC call to Anvil."""
+ import json as _json
+ from urllib import request
+
+ payload = {
+ "jsonrpc": "2.0",
+ "id": 1,
+ "method": method,
+ "params": params or [],
+ }
+ data = _json.dumps(payload).encode()
+ req = request.Request(
+ self.get_rpc_url(), data=data, headers={"Content-Type": "application/json"}
+ )
+ resp = request.urlopen(req)
+ result = _json.loads(resp.read())
+ if "error" in result:
+ raise RuntimeError(result["error"])
+ return result["result"]
+
+ def _get_gas_used(self, tx_hash: str) -> int:
+ """Get gas used from transaction receipt using cast."""
+ try:
+ result = subprocess.run(
+ [
+ "cast",
+ "receipt",
+ tx_hash,
+ "--field",
+ "gasUsed",
+ "--rpc-url",
+ self.get_rpc_url(),
+ ],
+ capture_output=True,
+ text=True,
+ timeout=self.config.timeouts.cast_command,
+ )
+ if result.returncode == 0:
+ return int(result.stdout.strip(), 16) # Convert hex to int
+ except Exception:
+ pass
+ return 0 # Default if we can't get gas info
+
+ def _perform_initial_swaps(self):
+ """Perform initial token swaps to give the wallet a diverse portfolio."""
+ # Get token configuration from config
+ tokens = self.config.whitelisted_tokens
+
+ # Amount to swap for each token
+ swap_amount = self.config.swaps.initial_swap_amount
+
+ # Swap for all whitelisted tokens from config
+ for token_name in tokens.__dict__.keys():
+ try:
+ token_info = getattr(tokens, token_name)
+
+ # Try direct RPC approach
+ success = self._execute_swap_direct(
+ token_name, token_info.address, swap_amount
+ )
+
+ if success:
+ # Check token balance after swap
+ self._check_token_balance(
+ token_name, token_info.address, token_info.decimals
+ )
+
+ # Add buffer between swaps
+ time.sleep(self.config.timeouts.operation_buffer)
+
+ except Exception as e:
+ anvil_logger.warning(f"Error swapping ETH for {token_name}: {str(e)}")
+ continue
+
+ def _check_token_balance(self, token_name: str, token_address: str, decimals: int):
+ """Check and log the balance of a specific token."""
+ try:
+ balance_result = subprocess.run(
+ [
+ "cast",
+ "call",
+ token_address,
+ "balanceOf(address)(uint256)",
+ self.config.funding.custom_wallet,
+ "--rpc-url",
+ self.get_rpc_url(),
+ ],
+ capture_output=True,
+ text=True,
+ timeout=self.config.timeouts.cast_command,
+ )
+
+ if balance_result.returncode == 0:
+ balance_output = balance_result.stdout.strip()
+ if balance_output:
+ # Parse the balance - cast call returns decimal, not hex
+ # Handle format like "26432331438 [2.643e10]"
+ balance_str = balance_output.split()[
+ 0
+ ] # Take first part before any brackets
+ balance_raw = int(balance_str)
+ balance_formatted = balance_raw / (10**decimals)
+ anvil_logger.info(
+ f"✓ {token_name} balance: {balance_formatted:.6f} {token_name}"
+ )
+ else:
+ anvil_logger.warning(
+ f"Empty response when checking {token_name} balance"
+ )
+ else:
+ anvil_logger.warning(f"Failed to check {token_name} balance")
+ except Exception as e:
+ anvil_logger.warning(f"Error checking {token_name} balance: {str(e)}")
+
+ def _direct_rpc_call(
+ self, method: str, params: Optional[List] = None
+ ) -> Dict[str, Any]:
+ """Make a direct RPC call to Anvil using HTTP requests."""
+ try:
+ payload = {
+ "jsonrpc": "2.0",
+ "id": 1,
+ "method": method,
+ "params": params or [],
+ }
+
+ response = requests.post(
+ self.get_rpc_url(),
+ json=payload,
+ timeout=self.config.timeouts.rpc,
+ headers={"Content-Type": "application/json"},
+ )
+
+ if response.status_code == 200:
+ result = response.json()
+
+ if "error" in result:
+ return {"success": False, "error": result["error"]}
+ else:
+ return {"success": True, "result": result.get("result")}
+ else:
+ return {
+ "success": False,
+ "error": f"HTTP {response.status_code}: {response.text}",
+ }
+
+ except requests.exceptions.Timeout:
+ return {"success": False, "error": "RPC timeout"}
+ except Exception as e:
+ return {"success": False, "error": str(e)}
+
+ def _execute_swap_direct(
+ self, token_name: str, token_address: str, swap_amount: str
+ ) -> bool:
+ """Execute swap using direct RPC calls instead of subprocess."""
+ try:
+ # Get configuration values
+ uniswap_router = self.config.defi.uniswap_v3_router
+ weth_address = self.config.defi.weth_address
+
+ # Create deadline
+ deadline = hex(
+ int(time.time()) + self.config.timeouts.transaction_deadline_offset
+ )
+
+ # Function selector for exactInputSingle
+ function_selector = self.config.swaps.uniswap_exact_input_single_selector
+
+ # Convert addresses to 32-byte hex (pad with zeros)
+ token_in_padded = weth_address.lower().replace("0x", "").zfill(64)
+ token_out_padded = token_address.lower().replace("0x", "").zfill(64)
+ fee_padded = hex(self.config.defi.default_uniswap_fee)[2:].zfill(64)
+ recipient_padded = (
+ self.config.funding.custom_wallet.lower().replace("0x", "").zfill(64)
+ )
+ deadline_padded = deadline[2:].zfill(64)
+ amount_in_padded = hex(int(swap_amount))[2:].zfill(64)
+ amount_out_min_padded = "0".zfill(64) # 0 minimum out
+ sqrt_price_limit_padded = "0".zfill(64) # 0 price limit
+
+ # Construct the full calldata
+ calldata = (
+ function_selector
+ + token_in_padded
+ + token_out_padded
+ + fee_padded
+ + recipient_padded
+ + deadline_padded
+ + amount_in_padded
+ + amount_out_min_padded
+ + sqrt_price_limit_padded
+ )
+
+ # Prepare transaction parameters
+ tx_params = {
+ "from": self.config.funding.custom_wallet.lower(),
+ "to": uniswap_router.lower(),
+ "value": hex(int(swap_amount)),
+ "data": calldata,
+ }
+
+ # Send the transaction via RPC
+ result = self._direct_rpc_call("eth_sendTransaction", [tx_params])
+
+ if result["success"]:
+ tx_hash = result["result"]
+
+ # Mine a block to include the transaction (Anvil in fork mode doesn't auto-mine)
+ mine_result = self._direct_rpc_call("evm_mine")
+ if not mine_result["success"]:
+ return False
+
+ # Check the transaction receipt
+ receipt_result = self._direct_rpc_call(
+ "eth_getTransactionReceipt", [tx_hash]
+ )
+ if receipt_result["success"] and receipt_result["result"]:
+ receipt = receipt_result["result"]
+ if receipt.get("status") == "0x1":
+ return True
+ else:
+ return False
+ else:
+ return False
+ else:
+ return False
+
+ except Exception as e:
+ anvil_logger.warning(f"Error in {token_name} swap: {str(e)}")
+ return False
+
+ def get_wallet_balances(
+ self, wallet_address: Optional[str] = None, tokens: Optional[List[str]] = None
+ ) -> Dict[str, Any]:
+ """
+ Get wallet balances for specified tokens or default set.
+
+ Args:
+ wallet_address: Address to check balances for (defaults to custom wallet)
+ tokens: List of token symbols to check (defaults to ETH + whitelisted tokens)
+
+ Returns:
+ Dict with token symbols as keys and balance info as values
+ """
+ wallet = wallet_address or self.config.funding.custom_wallet
+
+ # Default to ETH + whitelisted tokens if none specified
+ if tokens is None:
+ tokens = ["ETH"] + list(self.config.whitelisted_tokens.__dict__.keys())
+
+ balances = {}
+
+ for token_symbol in tokens:
+ try:
+ if token_symbol.upper() == "ETH":
+ # Get ETH balance using RPC call
+ result = self._direct_rpc_call("eth_getBalance", [wallet, "latest"])
+ if result["success"]:
+ balance_wei = int(result["result"], 16) # Convert hex to int
+ balance_eth = balance_wei / 10**18
+ balances["ETH"] = {
+ "symbol": "ETH",
+ "balance": balance_eth,
+ "balance_wei": str(balance_wei),
+ "decimals": 18,
+ }
+ else:
+ balances["ETH"] = {
+ "symbol": "ETH",
+ "balance": 0,
+ "error": result.get("error", "Unknown error"),
+ }
+
+ else:
+ # Get ERC-20 token balance using existing token check pattern
+ token_info = getattr(
+ self.config.whitelisted_tokens, token_symbol, None
+ )
+ if token_info is None:
+ balances[token_symbol] = {
+ "symbol": token_symbol,
+ "balance": 0,
+ "error": "Token not found in config",
+ }
+ continue
+
+ # Use existing cast command execution pattern
+ balance_result = self._execute_cast_command(
+ [
+ "cast",
+ "call",
+ token_info.address,
+ "balanceOf(address)(uint256)",
+ wallet,
+ "--rpc-url",
+ self.get_rpc_url(),
+ ]
+ )
+
+ if balance_result["success"]:
+ balance_output = balance_result["output"].strip()
+ if balance_output:
+ # Parse the balance using existing pattern from _check_token_balance
+ balance_str = balance_output.split()[
+ 0
+ ] # Take first part before any brackets
+ balance_raw = int(balance_str)
+ balance_formatted = balance_raw / (10**token_info.decimals)
+ balances[token_symbol] = {
+ "symbol": token_symbol,
+ "balance": balance_formatted,
+ "balance_raw": balance_raw,
+ "decimals": token_info.decimals,
+ "address": token_info.address,
+ }
+ else:
+ balances[token_symbol] = {
+ "symbol": token_symbol,
+ "balance": 0,
+ "error": "Empty balance response",
+ }
+ else:
+ balances[token_symbol] = {
+ "symbol": token_symbol,
+ "balance": 0,
+ "error": balance_result.get("error", "Cast command failed"),
+ }
+
+ except Exception as e:
+ balances[token_symbol] = {
+ "symbol": token_symbol,
+ "balance": 0,
+ "error": str(e),
+ }
+ anvil_logger.error(
+ f"Exception getting {token_symbol} balance: {str(e)}"
+ )
+
+ return balances
+
+ def _execute_cast_command(self, cmd: List[str]) -> Dict[str, Any]:
+ """
+ Execute a cast command and return standardized result.
+ Reuses existing patterns for cast command execution.
+ """
+ try:
+ result = subprocess.run(
+ cmd,
+ capture_output=True,
+ text=True,
+ timeout=self.config.timeouts.cast_command,
+ )
+
+ if result.returncode == 0:
+ return {"success": True, "output": result.stdout, "error": None}
+ else:
+ return {
+ "success": False,
+ "output": result.stdout,
+ "error": result.stderr.strip() or result.stdout.strip(),
+ }
+
+ except subprocess.TimeoutExpired:
+ return {"success": False, "output": "", "error": "Command timeout"}
+ except Exception as e:
+ return {"success": False, "output": "", "error": str(e)}
+
+ def __enter__(self):
+ """Context manager entry"""
+ self.start()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """Context manager exit"""
+ self.stop()
diff --git a/environments/community/ethereum_virtual_machine/configs/token_transfers.yaml b/environments/community/ethereum_virtual_machine/configs/token_transfers.yaml
new file mode 100644
index 00000000..75b85e7d
--- /dev/null
+++ b/environments/community/ethereum_virtual_machine/configs/token_transfers.yaml
@@ -0,0 +1,109 @@
+# Anvil Configuration
+
+# ============================================================================
+# ANVIL NETWORK CONFIGURATION
+# ============================================================================
+network:
+ port: 8545
+ fork_url: "https://reth-ethereum.ithaca.xyz/rpc"
+ log_file: "anvil.log"
+
+# ============================================================================
+# TIMEOUT CONFIGURATION
+# ============================================================================
+timeouts:
+ # Subprocess timeout for cast commands (seconds)
+ cast_command: 30
+
+ # RPC request timeout (seconds)
+ rpc: 30
+
+ # Anvil startup timeout (number of output lines to read)
+ anvil_startup_lines: 100
+
+ # Anvil shutdown timeout (seconds)
+ anvil_shutdown: 5
+
+ # Buffer time between operations (seconds)
+ operation_buffer: 2
+ wallet_setup_buffer: 3
+
+ # Transaction deadline offset (seconds from now)
+ transaction_deadline_offset: 3600 # 1 hour
+
+# ============================================================================
+# WALLET SEED FUNDING
+# ============================================================================
+funding:
+ # Anvil seed account (Available Account 0) - used for funding custom wallet
+ anvil_account_0: "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266"
+ anvil_private_key_0: "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"
+ custom_wallet: "0xcA4B9c26111Aacf982d85c4DE1bEBB7AeD2ffaa7"
+ initial_funding_amount: "1000000000000000000000" # 1000 ETH
+
+# ============================================================================
+# WHITELISTED TOKENS
+# ============================================================================
+whitelisted_tokens:
+ USDC:
+ address: "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"
+ decimals: 6
+ USDT:
+ address: "0xdAC17F958D2ee523a2206206994597C13D831ec7"
+ decimals: 6
+ WBTC:
+ address: "0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599"
+ decimals: 8
+ DAI:
+ address: "0x6B175474E89094C44Da98b954EedeAC495271d0F"
+ decimals: 18
+ LINK:
+ address: "0x514910771AF9Ca656af840dff83E8264EcF986CA"
+ decimals: 18
+ CRV:
+ address: "0xD533a949740bb3306d119CC777fa900bA034cd52"
+ decimals: 18
+ UNI:
+ address: "0x1f9840a85d5aF5bf1D1762F925BDADdC4201F984"
+ decimals: 18
+ LDO:
+ address: "0x5A98FcBEA516Cf06857215779Fd812CA3beF1B32"
+ decimals: 18
+
+# ============================================================================
+# DEFI PROTOCOL ADDRESSES
+# ============================================================================
+defi:
+ # Uniswap V3 Router address
+ uniswap_v3_router: "0xE592427A0AEce92De3Edee1F18E0157C05861564"
+
+ # WETH address
+ weth_address: "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
+
+ # Default Uniswap fee tier (0.3%)
+ default_uniswap_fee: 3000
+
+# ============================================================================
+# SWAP CONFIGURATION
+# ============================================================================
+swaps:
+ # Amount to swap for each token during initial setup (10 ETH in wei)
+ initial_swap_amount: "10000000000000000000"
+
+ # Function selector for Uniswap V3 exactInputSingle
+ uniswap_exact_input_single_selector: "0x414bf389"
+
+# ============================================================================
+# EVM ENVIRONMENT CONFIGURATION
+# ============================================================================
+# These values override the defaults in evm_config.py
+
+# Question generation LLM settings
+question_generation_model: "gpt-4o-mini"
+question_generation_temperature: 0.6
+question_generation_max_tokens: 256
+question_generation_n: 3
+
+# Question selection strategy
+weak_performance_threshold: 0.9
+weak_area_focus_ratio: 0.8
diff --git a/environments/community/ethereum_virtual_machine/evm_config.py b/environments/community/ethereum_virtual_machine/evm_config.py
new file mode 100644
index 00000000..f8f1b879
--- /dev/null
+++ b/environments/community/ethereum_virtual_machine/evm_config.py
@@ -0,0 +1,66 @@
+from typing import List
+
+from pydantic import Field
+from pydantic_settings import BaseSettings
+
+from atroposlib.envs.base import BaseEnvConfig
+
+
+class EVMEnvConfig(BaseEnvConfig, BaseSettings):
+ """Configuration for the EVM Environment"""
+
+ # Logging configuration
+ debug_logging: bool = Field(
+ default=False, description="Enable detailed debug logging"
+ )
+ suppress_base_env_logs: bool = Field(
+ default=True,
+ description="Suppress base environment INFO logs to reduce noise",
+ )
+
+ # Anvil configuration
+ anvil_config_path: str = Field(
+ "configs/token_transfers.yaml",
+ description="Path to Anvil configuration YAML file",
+ )
+ max_steps: int = Field(1, description="Only one step per transaction episode")
+ question_types: List[str] = Field(
+ default=[
+ "ETH transfer",
+ "ERC-20 transfer using 18 decimal token",
+ "ERC-20 transfer using a non-18 decimal token",
+ ],
+ description="Types of questions to generate for the agent",
+ )
+
+ # Question selection strategy configuration
+ weak_performance_threshold: float = Field(
+ default=0.9,
+ description="Performance threshold below which question types are considered weak (0.0-1.0)",
+ )
+ weak_area_focus_ratio: float = Field(
+ default=0.8,
+ description="Probability of focusing on weak areas vs strong areas (0.0-1.0)",
+ )
+
+ # LLM generation configuration for dynamic questions
+ question_generation_model: str = Field(
+ default="gpt-4o-mini",
+ description="Model to use for generating dynamic questions",
+ )
+ question_generation_temperature: float = Field(
+ default=0.6,
+ description="Temperature for question generation (0.0-2.0)",
+ )
+ question_generation_max_tokens: int = Field(
+ default=256,
+ description="Maximum tokens for question generation",
+ )
+ question_generation_n: int = Field(
+ default=3,
+ description="Number of responses to generate per question generation call",
+ )
+
+ class Config:
+ env_file = "configs/token_transfers.yaml"
+ env_file_encoding = "utf-8"
diff --git a/environments/community/ethereum_virtual_machine/evm_server.py b/environments/community/ethereum_virtual_machine/evm_server.py
new file mode 100644
index 00000000..f83d38d0
--- /dev/null
+++ b/environments/community/ethereum_virtual_machine/evm_server.py
@@ -0,0 +1,779 @@
+#!/usr/bin/env python3
+"""
+EVM Environment for Atropos: Ethereum Virtual Machine Transaction Agent Training
+
+This environment trains language models to generate and execute profitable Ethereum transactions
+using Anvil (Foundry's local blockchain simulation).
+"""
+
+import json
+import logging
+import os
+import random
+import re
+import sys
+import traceback
+from typing import Any, Dict, List, Optional, Tuple
+
+from anvil import AnvilBackend, AnvilConfig
+from evm_config import EVMEnvConfig
+from openai import OpenAI
+from utils import cleanup_blockchain, cleanup_manager, setup_evm_error_message
+
+from atroposlib.envs.base import BaseEnv, ScoredDataGroup
+from atroposlib.envs.server_handling.server_manager import APIServerConfig
+from atroposlib.type_definitions import Item
+from atroposlib.utils.tokenize_for_trainer import tokenize_for_trainer
+
+# Add logger
+logger = logging.getLogger(__name__)
+
+# System prompt for EVM transaction agent
+system_prompt = (
+ "You are a deep thinking AI, you may use extremely long chains of thought "
+ "to deeply consider the problem and deliberate with yourself via systematic "
+ "reasoning processes to help come to a correct solution prior to answering. "
+ "You should enclose your thoughts and internal monologue inside "
+ "tags, and then provide your solution or response to the problem.\n\n"
+)
+
+system_prompt += """You are allowed to use a maximum of 2048 tokens. Please strive to use less.
+
+You are here to assist a user execute transfers of both ETH and ERC-20 tokens as requested.
+Your job is to generate correct Ethereum transaction data for the requested action.
+
+IMPORTANT: After your thinking, your response must include a valid JSON transaction object:
+{"to": "0x...", "value": "amount_in_wei", "data": "0x..."}
+
+- 'to': The recipient address (contract or EOA)
+- 'value': Amount of ETH to send in wei (string)
+- 'data': Transaction data
+
+If you do not provide a valid JSON transaction object, your submission will be ignored and you \
+will receive a score of -1.0.
+
+Example 1:
+{
+ "to": "0xe688b84b23f322a994A53dbF8E15FA82CDB71127",
+ "value": "0.01",
+ "data": "0x"
+}
+
+Example 2:
+{
+ "to": "0xEA29e9da69317d80075fBfc836E843C6d65971F5",
+ "value": "0x",
+ "data": "0xa9059cbb000000000000000000000000ea29e9da69317d80075fbfc836e843c6d65971f50000000000000000000000000000000000000000000000000000000005f5e100" # noqa: E501
+}
+"""
+
+
+class EVMEnv(BaseEnv):
+ """EVM Transaction Environment for training agents to interact with Ethereum"""
+
+ name = "evm_agent"
+ env_config_cls = EVMEnvConfig
+
+ def __init__(
+ self,
+ config: EVMEnvConfig,
+ server_configs: List[APIServerConfig],
+ slurm=True,
+ testing=False,
+ ):
+ """Initialize the EVM environment"""
+ super().__init__(config, server_configs, slurm, testing)
+
+ # Set up minimal logging - only for essential operations
+ self.logger = logging.getLogger(f"{self.__class__.__name__}")
+ self.logger.setLevel(logging.WARNING) # Only warnings and errors
+ self.logger.propagate = False
+ if not self.logger.handlers:
+ handler = logging.StreamHandler()
+ formatter = logging.Formatter("%(message)s") # Clean format
+ handler.setFormatter(formatter)
+ self.logger.addHandler(handler)
+
+ # Suppress base environment logs
+ if config.suppress_base_env_logs:
+ base_logger = logging.getLogger("atroposlib.envs.base")
+ base_logger.setLevel(logging.WARNING)
+
+ # Load Anvil configuration
+ self.anvil_config = AnvilConfig(config.anvil_config_path)
+
+ # Initialize blockchain handler
+ self.blockchain = AnvilBackend(self.anvil_config)
+
+ # Performance tracking for adaptive question selection
+ self.question_performance = {qtype: [] for qtype in config.question_types}
+ self.current_question_type = None
+
+ # Store current prompt data for scoring
+ self.current_prompt_data = None
+
+ # Register cleanup with the global cleanup manager
+ cleanup_manager.register_cleanup(cleanup_blockchain, self.blockchain)
+
+ async def setup(self):
+ """Setup the EVM environment and start Anvil"""
+ try:
+ print("Starting Anvil blockchain simulation...")
+ self.blockchain.start()
+ self.blockchain.setup_wallet()
+ print("EVM environment setup completed successfully.")
+ except Exception as e:
+ error_message = setup_evm_error_message(self.anvil_config, e)
+ print(error_message)
+
+ # Cleanup and exit
+ cleanup_blockchain(self.blockchain)
+ sys.exit(1)
+
+ async def get_next_item(self) -> Optional[Item]:
+ """Generate the next transaction challenge for the agent"""
+ try:
+ # Select question type based on performance (exploration vs exploitation)
+ question_type = self._select_question_type()
+ self.current_question_type = question_type
+
+ # Generate question prompt and get structured data
+ prompt_text, prompt_data = await self._generate_question_prompt(
+ question_type
+ )
+
+ # Store the prompt data for scoring
+ self.current_prompt_data = prompt_data
+
+ # Display Generated Input
+ self.logger.debug("\n=== Generated Input ===")
+ self.logger.debug(prompt_text)
+ self.logger.debug("=" * 50)
+
+ prompt = tuple(
+ [frozenset({"role": "user", "content": prompt_text}.items())]
+ )
+
+ return (prompt, None, None)
+
+ except Exception as e:
+ print(f"Error in get_next_item: {e}")
+ traceback.print_exc()
+ return None
+
+ def _select_question_type(self) -> str:
+ """Select question type using weakness-targeting strategy with 80/20 ratio"""
+ # If no performance data yet, select randomly
+ if not any(self.question_performance.values()):
+ return random.choice(self.config.question_types)
+
+ # Calculate average scores for each question type
+ avg_scores = {}
+ for qtype, scores in self.question_performance.items():
+ if scores:
+ avg_scores[qtype] = sum(scores) / len(scores)
+ else:
+ avg_scores[qtype] = 0.0 # Prioritize untested question types
+
+ # Split into weak and strong areas based on configurable performance threshold
+ weak_threshold = self.config.weak_performance_threshold
+
+ weak_qtypes = [
+ qtype for qtype, score in avg_scores.items() if score < weak_threshold
+ ]
+ strong_qtypes = [
+ qtype for qtype, score in avg_scores.items() if score >= weak_threshold
+ ]
+
+ # Configurable focus on weak areas vs strong areas for mastery maintenance
+ if random.random() < self.config.weak_area_focus_ratio and weak_qtypes:
+ selected_type = random.choice(weak_qtypes)
+ elif strong_qtypes:
+ selected_type = random.choice(strong_qtypes)
+ else:
+ selected_type = random.choice(list(avg_scores.keys()))
+
+ return selected_type
+
+ async def _generate_question_prompt(
+ self, question_type: str
+ ) -> Tuple[str, Optional[Dict[str, Any]]]:
+ """Generate a dynamic question prompt using LLM based on the question type"""
+
+ # Initialize OpenAI client
+ client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
+
+ # Create prompt for LLM to generate a request
+ llm_prompt = f"""You are generating a natural language transaction request for an Ethereum blockchain agent.
+
+TRANSACTION TYPE: "{question_type}"
+
+CONTEXT:
+- Wallet Address: {self.anvil_config.funding.custom_wallet}
+- Current Balances: {json.dumps(self.blockchain.get_wallet_balances(), indent=2)}
+
+TASK:
+Generate a realistic, conversational transaction request that:
+1. Matches the specified transaction type exactly
+2. Does not use more than current wallet balances and typically would be small transfers or possibly larger,
+ like how a real person may use their assets
+3. Includes all necessary details (token, amount, destination address)
+4. Sounds like how a real user would naturally request a transaction
+5. Varies in tone and style (casual, formal, urgent, etc.)
+
+REQUIREMENTS:
+- Use realistic destination addresses (not placeholder text like
+ "0x1234567890123456789012345678901234567890")
+- Does not specify amounts larger than 50% of the current balance
+- Make the request executable
+
+Generate ONE natural language request that matches the transaction type "{question_type}".
+Respond with a JSON object with the following fields:
+- question_type: The type of transaction to generate
+- request: The natural language request text
+- destination_address: The destination address
+- transfer_token: The token to transfer
+- transfer_amount: The amount to transfer
+
+Examples:
+1. ETH transfer
+{{
+ "question_type": "ETH transfer",
+ "request": "yo, can i send 0.01 ETH to my buddy jasper? His address is jasper.eth"
+ "destination_address": "jasper.eth"
+ "transfer_token": "ETH"
+ "transfer_amount": "0.01"
+}}
+2. ERC-20 transfer using 18 decimal token
+{{
+ "question_type": "ERC-20 transfer using 18 decimal token",
+ "request": "Send 100 CRV to 0xe688b84b23f322a994A53dbF8E15FA82CDB71127"
+ "destination_address": "0xe688b84b23f322a994A53dbF8E15FA82CDB71127"
+ "transfer_token": "CRV"
+ "transfer_amount": "100"
+}}
+3. ERC-20 transfer using a non-18 decimal token (e.g. USDT)
+{{
+ "question_type": "ERC-20 transfer using a non-18 decimal token",
+ "request": "give 100 tether to 0xea29e9da69317d80075fbfc836e843c6d65971f5"
+ "destination_address": "0xea29e9da69317d80075fbfc836e843c6d65971f5"
+ "transfer_token": "USDT"
+ "transfer_amount": "100"
+}}
+"""
+
+ try:
+ # Generate multiple responses in a single call for efficiency
+ response = client.chat.completions.create(
+ model=self.config.question_generation_model,
+ messages=[{"role": "user", "content": llm_prompt}],
+ temperature=self.config.question_generation_temperature,
+ max_tokens=self.config.question_generation_max_tokens,
+ n=self.config.question_generation_n,
+ )
+
+ # Try each response until we find a valid one
+ for i, choice in enumerate(response.choices):
+ generated_content = choice.message.content.strip()
+
+ # Extract JSON from response using generic function
+ prompt_data = self._extract_json_from_response(
+ generated_content, ["question_type", "request"], "prompt"
+ )
+
+ # Validate required fields
+ if prompt_data and self._validate_prompt_data(
+ prompt_data, question_type
+ ):
+ return prompt_data["request"], prompt_data
+
+ # All choices failed, use fallback
+ fallback_data = {
+ "question_type": question_type,
+ "request": "Transfer 0.01 ETH to 0x0000000000000000000000000000000000000000",
+ "destination_address": "0x0000000000000000000000000000000000000000",
+ "transfer_token": "ETH",
+ "transfer_amount": "0.01",
+ }
+ return fallback_data["request"], fallback_data
+
+ except Exception:
+ fallback_data = {
+ "question_type": question_type,
+ "request": "Transfer 0.01 ETH to 0x0000000000000000000000000000000000000000",
+ "destination_address": "0x0000000000000000000000000000000000000000",
+ "transfer_token": "ETH",
+ "transfer_amount": "0.01",
+ }
+ return fallback_data["request"], fallback_data
+
+ def _extract_json_from_response(
+ self, response: str, required_keys: List[str], json_type: str = "JSON"
+ ) -> Optional[Dict[str, Any]]:
+ """Generic JSON extraction from LLM response, handling thinking tags"""
+ if not isinstance(response, str):
+ return None
+
+ # First, try to extract content after thinking tags (following SWE pattern)
+ content_after_think = response
+ think_end_match = re.search(r"", response, re.IGNORECASE)
+ if think_end_match:
+ content_after_think = response[think_end_match.end() :].strip()
+
+ # Create patterns based on required keys
+ if len(required_keys) >= 2:
+ key1, key2 = required_keys[0], required_keys[1]
+ json_patterns = [
+ rf'\{{[^{{}}]*"{key1}"[^{{}}]*"{key2}"[^{{}}]*\}}', # Simple pattern with first two keys
+ rf'\{{.*?"{key1}".*?"{key2}".*?\}}', # Flexible pattern with first two keys
+ r"\{.*?\}", # Any JSON object
+ ]
+ else:
+ json_patterns = [r"\{.*?\}"] # Fallback to any JSON
+
+ for pattern in json_patterns:
+ matches = re.findall(pattern, content_after_think, re.DOTALL)
+ for match in matches:
+ try:
+ # Clean up the JSON string
+ json_str = match.strip()
+
+ # Parse the JSON
+ obj = json.loads(json_str)
+
+ # Verify it has the expected structure
+ if isinstance(obj, dict) and required_keys[0] in obj:
+ return obj
+
+ except json.JSONDecodeError:
+ continue
+
+ return None
+
+ def _extract_transaction_json(self, response: str) -> Optional[Dict[str, Any]]:
+ """Extract transaction JSON from LLM response"""
+ return self._extract_json_from_response(
+ response, ["to", "value"], "transaction"
+ )
+
+ def _validate_prompt_data(
+ self, prompt_data: Dict[str, Any], expected_question_type: str
+ ) -> bool:
+ """Validate that prompt data has all required fields and correct question type"""
+ required_fields = [
+ "question_type",
+ "request",
+ "destination_address",
+ "transfer_token",
+ "transfer_amount",
+ ]
+
+ # Check all required fields are present
+ if not all(field in prompt_data for field in required_fields):
+ return False
+
+ # Check question type matches what we requested
+ if prompt_data["question_type"] != expected_question_type:
+ return False
+
+ # Check that fields are not empty
+ for field in required_fields:
+ if not prompt_data[field] or str(prompt_data[field]).strip() == "":
+ return False
+
+ return True
+
+ async def collect_trajectories(
+ self, item: Item
+ ) -> Tuple[Optional[ScoredDataGroup], List[Item]]:
+ """Collect trajectories by having the agent generate transactions"""
+ to_score = []
+ to_backlog = []
+
+ system_msg = {
+ "role": "system",
+ "content": system_prompt,
+ }
+
+ user_msg = {"role": "user", "content": dict(item[0][0])["content"]}
+
+ messages = [system_msg, user_msg]
+
+ try:
+ # Use proper Atropos framework pattern like humor generation
+ chat_completions = await self.server.chat_completion(
+ messages=messages,
+ n=self.config.group_size,
+ max_tokens=2048,
+ )
+
+ # Store completions for output saving
+ self.last_completions = []
+
+ for i, choice in enumerate(chat_completions.choices):
+ # Store the completion
+ self.last_completions.append(choice.message.content)
+
+ # Display Generated Output
+ self.logger.debug(f"\n=== Generated Output {i+1} ===")
+ self.logger.debug(choice.message.content)
+ self.logger.debug("=" * 50)
+
+ history = [
+ {"role": "system", "content": system_msg["content"]},
+ {"role": "user", "content": user_msg["content"]},
+ {"role": "assistant", "content": choice.message.content},
+ ]
+ to_score.append((history, item[1], None))
+
+ except Exception as e:
+ print(f"Error in collect_trajectories: {e}")
+ traceback.print_exc()
+ to_backlog.append(item)
+
+ if not to_score:
+ return None, to_backlog
+
+ scored_data = await self.score(to_score)
+ return scored_data, to_backlog
+
+ async def score(self, rollout_group_data) -> Optional[ScoredDataGroup]:
+ """Score the generated transactions by executing them on Anvil"""
+ if not rollout_group_data:
+ return None
+
+ scores = ScoredDataGroup()
+ scores["tokens"] = []
+ scores["masks"] = []
+ scores["scores"] = []
+ scores["advantages"] = None
+ scores["ref_logprobs"] = None
+ scores["messages"] = None
+ scores["group_overrides"] = {"group_size": self.config.group_size}
+ scores["overrides"] = None
+ scores["ground_truths"] = []
+
+ for i, item in enumerate(rollout_group_data):
+ out = tokenize_for_trainer(self.tokenizer, item[0])
+ tokens = out["tokens"]
+ masks = out["masks"]
+
+ try:
+ # Extract the agent's response (transaction JSON)
+ agent_response = item[0][-1]["content"].strip()
+ ground_truth = item[1] if isinstance(item[1], str) else ""
+
+ # Score the transaction
+ score = await self._score_transaction(agent_response)
+
+ # Display Score
+ self.logger.debug(f"\n=== Score {i+1} ===")
+ self.logger.debug(f"{score}")
+ self.logger.debug("=" * 50)
+
+ # Track performance for this question type
+ if self.current_question_type:
+ self.question_performance[self.current_question_type].append(score)
+ # Keep only last 10 scores per question type
+ if len(self.question_performance[self.current_question_type]) > 10:
+ self.question_performance[self.current_question_type].pop(0)
+
+ except Exception as e:
+ score = -1.0
+ ground_truth = item[1] if isinstance(item[1], str) else ""
+
+ # Display Score for error case
+ print(f"\n=== Score {i+1} ===")
+ print(f"{score} (Error: {e})")
+ print("=" * 50)
+
+ # Skip if too few tokens
+ if len([i for i in masks if i != -100]) < 10:
+ continue
+
+ scores["tokens"].append(tokens)
+ scores["masks"].append(masks)
+ scores["scores"].append(score)
+ scores["ground_truths"].append(ground_truth)
+
+ if len(scores["tokens"]) >= self.config.group_size:
+ break
+
+ if not scores["tokens"]:
+ return None
+
+ return scores
+
+ async def _score_transaction(self, agent_response: str) -> float:
+ """Score a transaction based on multiple criteria"""
+ try:
+ # First, extract JSON from the response (handling thinking tags)
+ tx_obj = self._extract_transaction_json(agent_response)
+ if tx_obj is None:
+ return -1.0 # Could not extract valid JSON
+
+ # Validate required fields
+ if not all(field in tx_obj for field in ["to", "value", "data"]):
+ return -1.0 # Missing required fields
+
+ # Get expected transfer details from stored prompt data
+ if not hasattr(self, "current_prompt_data") or not self.current_prompt_data:
+ return -1.0
+
+ expected_token = self.current_prompt_data.get("transfer_token", "ETH")
+ expected_amount = self.current_prompt_data.get("transfer_amount", "0")
+ expected_destination = self.current_prompt_data.get(
+ "destination_address", ""
+ )
+
+ # Get sender and destination addresses
+ sender_address = self.anvil_config.funding.custom_wallet
+ destination_address = tx_obj.get("to", "")
+
+ # Get relevant tokens to check
+ relevant_tokens = ["ETH"]
+ if expected_token != "ETH":
+ relevant_tokens.append(expected_token)
+
+ # Take a snapshot before execution
+ snapshot_id = self.blockchain.snapshot()
+
+ # Get pre-execution balances for both addresses
+ pre_balances = {
+ "sender": self.blockchain.get_wallet_balances(
+ sender_address, relevant_tokens
+ ),
+ "destination": self.blockchain.get_wallet_balances(
+ destination_address, relevant_tokens
+ ),
+ }
+
+ try:
+ # Execute the transaction
+ result = self.blockchain.execute_transaction(tx_obj)
+
+ # Get post-execution balances
+ post_balances = {
+ "sender": self.blockchain.get_wallet_balances(
+ sender_address, relevant_tokens
+ ),
+ "destination": self.blockchain.get_wallet_balances(
+ destination_address, relevant_tokens
+ ),
+ }
+
+ # Calculate score based on execution result and balance changes
+ score = self._calculate_transaction_score(
+ tx_obj,
+ result,
+ agent_response,
+ pre_balances,
+ post_balances,
+ expected_token,
+ expected_amount,
+ expected_destination,
+ )
+
+ # Revert to snapshot to maintain clean state
+ self.blockchain.revert(snapshot_id)
+
+ return score
+
+ except Exception:
+ # Revert on any error
+ self.blockchain.revert(snapshot_id)
+ return -1.0 # Execution error
+
+ except Exception:
+ return -1.0 # General error
+
+ def _calculate_transaction_score(
+ self,
+ tx_obj: Dict[str, Any],
+ result: Dict[str, Any],
+ agent_response: str,
+ pre_balances: Dict[str, Dict[str, Any]],
+ post_balances: Dict[str, Dict[str, Any]],
+ transfer_token: str,
+ transfer_amount: str,
+ destination_address: str,
+ ) -> float:
+ """Calculate score based on transaction execution results and balance changes"""
+ base_score = 0.0
+
+ # 1. Successful execution (0.3 points)
+ if result.get("status") == "0x1":
+ base_score += 0.3 # Transaction succeeded
+
+ # 2. Correct transaction - exact balance verification (0.5 points)
+ balance_score = self._verify_expected_transfers(
+ pre_balances,
+ post_balances,
+ transfer_token,
+ transfer_amount,
+ destination_address,
+ )
+ base_score += balance_score
+
+ # 3. Thinking quality (max 0.1 points, with negative for missing thinking)
+ thinking_score = self._analyze_thinking_quality(agent_response)
+ base_score += thinking_score # Range: -0.2 to +0.1
+
+ # 4. To field verification (0.05 points)
+ tx_to = tx_obj.get("to", "").lower()
+ expected_to = destination_address.lower()
+ if tx_to == expected_to:
+ base_score += 0.05
+
+ # 5. Data field verification (0.05 points)
+ data_field = tx_obj.get("data", "0x")
+ if transfer_token == "ETH":
+ # ETH transfer should have empty data field
+ if data_field == "0x":
+ base_score += 0.05
+ else:
+ # ERC-20 transfer should have transfer function call data
+ if data_field.startswith("0xa9059cbb"): # transfer function selector
+ base_score += 0.05
+
+ return base_score
+
+ def _verify_expected_transfers(
+ self,
+ pre_balances: Dict[str, Dict[str, Any]],
+ post_balances: Dict[str, Dict[str, Any]],
+ expected_token: str,
+ expected_amount: str,
+ expected_destination: str,
+ ) -> float:
+ """Verify that the expected transfer amounts occurred"""
+ try:
+ expected_amount_float = float(expected_amount)
+
+ # For ETH transfers - only check destination address
+ if expected_token == "ETH":
+ # Extract balance values from the nested dictionary structure
+ dest_pre_balance = (
+ pre_balances["destination"].get("ETH", {}).get("balance", 0)
+ )
+ dest_post_balance = (
+ post_balances["destination"].get("ETH", {}).get("balance", 0)
+ )
+
+ dest_eth_change = float(dest_post_balance) - float(dest_pre_balance)
+
+ # Check if destination gained exactly the expected amount
+ if abs(dest_eth_change - expected_amount_float) < 1e-10:
+ return 0.5
+
+ # For ERC-20 transfers - check both sender and destination
+ else:
+ # Extract balance values from the nested dictionary structure
+ sender_pre_balance = (
+ pre_balances["sender"].get(expected_token, {}).get("balance", 0)
+ )
+ sender_post_balance = (
+ post_balances["sender"].get(expected_token, {}).get("balance", 0)
+ )
+ dest_pre_balance = (
+ pre_balances["destination"]
+ .get(expected_token, {})
+ .get("balance", 0)
+ )
+ dest_post_balance = (
+ post_balances["destination"]
+ .get(expected_token, {})
+ .get("balance", 0)
+ )
+
+ sender_token_change = float(sender_post_balance) - float(
+ sender_pre_balance
+ )
+ dest_token_change = float(dest_post_balance) - float(dest_pre_balance)
+
+ # For ERC-20, expect exact amounts (no gas costs in token)
+ if (
+ abs(sender_token_change + expected_amount_float) < 1e-6
+ and abs(dest_token_change - expected_amount_float) < 1e-6
+ ):
+ return 0.5
+
+ return 0.0 # Transfer amounts don't match expectations
+
+ except (ValueError, TypeError):
+ return 0.0
+
+ def _analyze_thinking_quality(self, response: str) -> float:
+ """Evaluate thinking tag quality with max 0.1 points, negative for missing thinking"""
+ thinking_score = 0.0
+
+ # Check for thinking tags
+ if "" not in response or "" not in response:
+ return -0.2 # Penalty for no thinking tags
+
+ # Extract thinking content
+ try:
+ thinking_match = re.search(r"(.*?)", response, re.DOTALL)
+ if not thinking_match:
+ return -0.2 # No thinking content found
+
+ thinking_content = thinking_match.group(1).strip()
+ if not thinking_content:
+ return -0.1 # Empty thinking tags
+
+ # Basic quality assessment for positive score (max 0.1)
+ word_count = len(thinking_content.split())
+
+ # Award points based on thinking depth
+ if word_count >= 50: # Substantial thinking
+ thinking_score += 0.1
+ elif word_count >= 20: # Moderate thinking
+ thinking_score += 0.05
+ elif word_count >= 5: # Minimal thinking
+ thinking_score += 0.02
+
+ return thinking_score
+
+ except Exception:
+ return -0.1 # Error in processing thinking
+
+ async def evaluate(self, *args, **kwargs):
+ """Evaluation method - could implement portfolio performance tracking"""
+ return
+
+ def close(self):
+ """Clean up resources"""
+ cleanup_blockchain(self.blockchain)
+
+ @classmethod
+ def config_init(cls) -> Tuple[EVMEnvConfig, List[APIServerConfig]]:
+ """Initialize configuration for EVM environment"""
+ # pydantic-settings automatically loads from YAML file
+ env_config = EVMEnvConfig(
+ tokenizer_name="NousResearch/Hermes-3-Llama-3.1-8B",
+ group_size=4,
+ use_wandb=True,
+ rollout_server_url="http://localhost:8000",
+ total_steps=500,
+ batch_size=16,
+ steps_per_eval=50,
+ max_token_length=2048,
+ wandb_name="evm-agent",
+ anvil_config_path="configs/token_transfers.yaml",
+ )
+
+ # API server configuration
+ server_configs = [
+ APIServerConfig(
+ model_name="gpt-4o-mini",
+ base_url=None, # Use OpenAI directly
+ api_key=os.environ.get("OPENAI_API_KEY"),
+ num_requests_for_eval=64,
+ ),
+ ]
+
+ return env_config, server_configs
+
+
+if __name__ == "__main__":
+ EVMEnv.cli()
diff --git a/environments/community/ethereum_virtual_machine/requirements.txt b/environments/community/ethereum_virtual_machine/requirements.txt
new file mode 100644
index 00000000..a1ae7659
--- /dev/null
+++ b/environments/community/ethereum_virtual_machine/requirements.txt
@@ -0,0 +1,3 @@
+# EVM Environment specific dependencies
+# (openai, pydantic, requests, gymnasium, numpy are provided by atroposlib)
+pyyaml>=6.0
diff --git a/environments/community/ethereum_virtual_machine/setup.sh b/environments/community/ethereum_virtual_machine/setup.sh
new file mode 100644
index 00000000..98054296
--- /dev/null
+++ b/environments/community/ethereum_virtual_machine/setup.sh
@@ -0,0 +1,165 @@
+#!/bin/bash
+
+# Ethereum Virtual Machine Environment Setup Script
+# This script installs Foundry/Anvil and sets up the Python environment
+
+set -e # Exit on any error
+
+echo "🔧 Setting up Ethereum Virtual Machine Environment..."
+
+# Colors for output
+RED='\033[0;31m'
+GREEN='\033[0;32m'
+YELLOW='\033[1;33m'
+NC='\033[0m' # No Color
+
+# Function to print colored output
+print_status() {
+ echo -e "${GREEN}[INFO]${NC} $1"
+}
+
+print_warning() {
+ echo -e "${YELLOW}[WARNING]${NC} $1"
+}
+
+print_error() {
+ echo -e "${RED}[ERROR]${NC} $1"
+}
+
+# Check if Foundry is already installed
+check_foundry() {
+ if command -v anvil &> /dev/null && command -v cast &> /dev/null && command -v forge &> /dev/null; then
+ print_status "Foundry is already installed"
+ anvil --version
+ return 0
+ else
+ return 1
+ fi
+}
+
+# Install Foundry
+install_foundry() {
+ print_status "Installing Foundry..."
+
+ # Download and install Foundry
+ curl -L https://foundry.paradigm.xyz | bash
+
+ # Source the profile to update PATH
+ if [ -f ~/.bashrc ]; then
+ source ~/.bashrc
+ elif [ -f ~/.zshrc ]; then
+ source ~/.zshrc
+ fi
+
+ # Run foundryup to install the latest version
+ if command -v foundryup &> /dev/null; then
+ foundryup
+ else
+ print_warning "foundryup not found in PATH. Please restart your terminal and run 'foundryup'"
+ print_warning "Then run this setup script again."
+ exit 1
+ fi
+}
+
+# Verify installation
+verify_installation() {
+ print_status "Verifying installation..."
+
+ if ! command -v anvil &> /dev/null; then
+ print_error "Anvil not found. Installation may have failed."
+ print_error "Please restart your terminal and try again."
+ exit 1
+ fi
+
+ if ! command -v cast &> /dev/null; then
+ print_error "Cast not found. Installation may have failed."
+ exit 1
+ fi
+
+ print_status "✅ Foundry tools installed successfully:"
+ echo " - $(anvil --version)"
+ echo " - $(cast --version)"
+ echo " - $(forge --version)"
+}
+
+# Install Python dependencies
+install_python_deps() {
+ print_status "Installing Python dependencies..."
+
+ if [ -f "requirements.txt" ]; then
+ pip install -r requirements.txt
+ print_status "Installed EVM-specific dependencies from requirements.txt"
+ else
+ print_warning "requirements.txt not found. Installing minimal dependencies..."
+ pip install pyyaml>=6.0
+ fi
+
+ print_status "Note: Main dependencies (openai, pydantic, requests) are provided by atroposlib"
+}
+
+# Check for OpenAI API key
+check_openai_key() {
+ if [ -z "$OPENAI_API_KEY" ]; then
+ print_warning "OPENAI_API_KEY environment variable not set"
+ echo " Set it with: export OPENAI_API_KEY='your-api-key-here'"
+ echo " This is required for question generation"
+ else
+ print_status "✅ OPENAI_API_KEY is set"
+ fi
+}
+
+# Test the configuration
+test_config() {
+ print_status "Testing configuration..."
+
+ python -c "
+try:
+ from anvil import AnvilConfig
+ config = AnvilConfig()
+ print('✅ Configuration loaded successfully')
+ print(f' - Config file: {config.config_file}')
+ print(f' - Network port: {config.anvil.port}')
+ print(f' - Fork URL: {config.anvil.fork_url}')
+ print(f' - Custom wallet: {config.funding.custom_wallet}')
+except Exception as e:
+ print(f'❌ Configuration test failed: {e}')
+ exit(1)
+"
+}
+
+# Main setup process
+main() {
+ echo "🚀 Starting setup process..."
+ echo
+
+ # Check if already installed
+ if check_foundry; then
+ print_status "Foundry already installed, skipping installation"
+ else
+ install_foundry
+ verify_installation
+ fi
+
+ echo
+ install_python_deps
+
+ echo
+ check_openai_key
+
+ echo
+ test_config
+
+ echo
+ print_status "🎉 Setup completed successfully!"
+ echo
+ echo "Next steps:"
+ echo " 1. Configure configs/token_transfers.yaml if needed"
+ echo " 2. Set OPENAI_API_KEY if not already set"
+ echo " 3. Run inference: python evm_server.py process --env.data_path_to_save_groups evm_rollouts.jsonl"
+ echo " 4. Or run training: python evm_server.py serve"
+ echo
+ echo "For troubleshooting, see README.md"
+}
+
+# Run main function
+main "$@"
diff --git a/environments/community/ethereum_virtual_machine/utils.py b/environments/community/ethereum_virtual_machine/utils.py
new file mode 100644
index 00000000..fadabca4
--- /dev/null
+++ b/environments/community/ethereum_virtual_machine/utils.py
@@ -0,0 +1,84 @@
+"""
+Utility functions for the EVM Environment
+
+This module contains cleanup handlers, signal management, and other utility functions.
+"""
+
+import atexit
+import logging
+import signal
+import sys
+
+
+class CleanupManager:
+ """Manages cleanup operations for the EVM environment"""
+
+ def __init__(self):
+ self.cleanup_functions = []
+ self.logger = logging.getLogger(__name__)
+ self._setup_handlers()
+
+ def _setup_handlers(self):
+ """Setup cleanup handlers for various exit scenarios"""
+ # Register cleanup function to run on normal exit
+ atexit.register(self._execute_cleanup)
+
+ # Register signal handlers for graceful shutdown
+ signal.signal(signal.SIGINT, self._signal_handler) # Ctrl+C
+ signal.signal(signal.SIGTERM, self._signal_handler) # Termination signal
+
+ # On Windows, also handle SIGBREAK
+ if hasattr(signal, "SIGBREAK"):
+ signal.signal(signal.SIGBREAK, self._signal_handler)
+
+ def register_cleanup(self, cleanup_func, *args, **kwargs):
+ """Register a cleanup function to be called on exit"""
+ self.cleanup_functions.append((cleanup_func, args, kwargs))
+
+ def _signal_handler(self, signum, frame):
+ """Handle shutdown signals gracefully"""
+ print(f"\nReceived signal {signum}, shutting down gracefully...")
+ self._execute_cleanup()
+ sys.exit(0)
+
+ def _execute_cleanup(self):
+ """Execute all registered cleanup functions"""
+ for cleanup_func, args, kwargs in self.cleanup_functions:
+ try:
+ cleanup_func(*args, **kwargs)
+ except Exception as e:
+ print(f"Error during cleanup: {e}")
+
+
+def setup_evm_error_message(anvil_config, error: Exception) -> str:
+ """Generate a comprehensive error message for EVM setup failures"""
+ error_message = f"\n❌ Error setting up EVM environment: {error}"
+ error_message += "\n\n🔧 Troubleshooting suggestions:"
+ error_message += "\n 1. Check if Anvil is already running on the configured port"
+ error_message += "\n 2. Ensure no previous Anvil processes are still running:"
+ error_message += "\n - Run: pkill -f anvil"
+ error_message += "\n - Or: ps aux | grep anvil"
+ error_message += "\n 3. Verify Foundry/Anvil is properly installed:"
+ error_message += "\n - Run: anvil --version"
+ error_message += "\n 4. Check if the port is available:"
+ error_message += f"\n - Run: netstat -tulpn | grep {anvil_config.anvil.port}"
+ error_message += (
+ "\n\n💡 Try restarting the environment after addressing these issues."
+ )
+
+ return error_message
+
+
+def cleanup_blockchain(blockchain) -> None:
+ """Clean up blockchain resources"""
+ try:
+ if blockchain:
+ print("Stopping Anvil blockchain...")
+ blockchain.stop()
+ print("Anvil stopped successfully.")
+ except Exception as e:
+ print(f"Error during blockchain cleanup: {e}")
+
+
+# Global cleanup manager instance
+cleanup_manager = CleanupManager()