Ethereum Virtual Machine Text to Transaction Environment (#187)

* EVM-text_to_transaction

* update structure

* Update README

---------

Co-authored-by: Jeremy Melvin <jeremy@openblocklabs.com>
This commit is contained in:
Jeremy Melvin 2025-06-19 16:16:00 -07:00 committed by GitHub
parent d0a253e1b5
commit 3bed7c64b9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 2135 additions and 0 deletions

View file

@ -0,0 +1,164 @@
# Ethereum Virtual Machine (EVM) Transaction Agent Environment
Atropos environment for training language models to generate and execute profitable Ethereum transactions. An active forked version of the blockchain is created using Anvil (https://getfoundry.sh/guides/forking-mainnet-with-cast-anvil) to allow for execution and state inspection to verify transactions execute and perform the desired action.
## Overview
This environment trains language models to become proficient at text to transaction for EVM blockchains. The existing config allows for ETH and ERC-20 transfers to be generated in natural language dynamically by LLM calls. These are designed to target different types of transactions with increasing frequency towards those transaction types that the model is scoring poorly on. The agent learns to handle ETH transfers, ERC-20 token transfers, and complex DeFi interactions through reinforcement learning.
## Features
- **Complete EVM Training Environment**: Full implementation of the BaseEnv interface for Atropos
- **Anvil Blockchain Simulation**: Local Ethereum fork for safe transaction testing
- **Multi-Token Support**: ETH and major ERC-20 tokens (USDC, USDT, DAI, WETH, CRV)
- **Dynamic Question Generation**: LLM-powered generation of realistic transaction requests
- **Comprehensive Scoring System**: Multi-dimensional evaluation of transaction correctness
- **Adaptive Learning**: Performance-based question type selection for targeted improvement
- **Robust Cleanup**: Graceful handling of interruptions and proper resource management
## Files
- **evm_server.py**: Main environment implementation with transaction scoring logic
- **anvil.py**: Anvil blockchain backend management with integrated configuration
- **configs/token_transfers.yaml**: Blockchain simulation configuration
- **utils.py**: Cleanup handlers and utility functions
## Transaction Types
The environment trains on three primary transaction categories:
1. **ETH Transfer**: Simple Ether transfers between addresses
2. **ERC-20 Transfer (18 decimals)**: Standard token transfers (DAI, WETH, CRV)
3. **ERC-20 Transfer (non-18 decimals)**: Tokens with different decimal precision (USDC, USDT)
## Verified Scoring System with Anvil
Unlike traditional RL environments that rely on simulated or estimated rewards, this environment provides **cryptographically verified rewards** by executing transactions on a real Ethereum Virtual Machine simulation powered by Anvil. This ensures that scoring is based on actual blockchain state changes rather than heuristic approximations.
### Anvil-Powered Verification
**Anvil** (Foundry's blockchain simulator) enables true verification by:
- **Real EVM Execution**: Transactions run on an actual Ethereum Virtual Machine, not a simplified simulation
- **Mainnet Fork**: Uses real mainnet state with actual token contracts and balances
- **Cryptographic Verification**: Transaction success/failure is determined by EVM consensus rules
- **Atomic State Management**: Blockchain snapshots ensure clean evaluation without side effects
- **Gas Estimation**: Real gas consumption and fee calculation for realistic training
### Scoring Methodology
The environment employs a **snapshot-execute-verify-revert** cycle for each transaction:
```
1. Snapshot blockchain state
2. Record pre-execution balances
3. Execute agent's transaction
4. Measure actual state changes
5. Calculate verified score
6. Revert to clean snapshot
```
This process ensures that:
- ✅ **No False Positives**: Only correctly executed transactions receive rewards
- ✅ **Precise Measurement**: Exact balance changes are measured, not estimated
- ✅ **Isolated Evaluation**: Each transaction is evaluated independently
- ✅ **Real-World Validity**: Successful transactions would work on actual mainnet
### Five-Dimensional Scoring
The reward function evaluates transactions across five verified dimensions:
1. **Correct Balance Changes (0.5 points)**:
- **Most Critical Component**: Measures actual on-chain balance differences
- Compares pre/post execution balances with cryptographic precision
- For ETH: Exact wei amounts transferred to destination
- For ERC-20: Exact token units transferred (accounting for decimals)
- Verified against real contract state, not estimated
2. **Successful Execution (0.3 points)**:
- Verified by EVM status code (`0x1` = success)
- Ensures transaction doesn't revert due to insufficient funds, gas, or logic errors
- Only awarded if transaction is mined successfully
3. **Thinking Quality (±0.1 points)**:
4. **Destination Address Accuracy (0.05 points)**:
5. **Data Field Correctness (0.05 points)**:
**Total Score Range**: -0.2 to 1.0
- **Perfect execution**: 1.0 (all components correct)
- **Missing thinking**: -0.2 (penalty for unexplained decisions)
- **Partial success**: Proportional scoring based on verified components
## Prerequisites
### System Requirements
- Python 3.8+
- [Foundry](https://book.getfoundry.sh/) (includes Anvil and Cast)
- OpenAI API key
### Installing Foundry/Anvil
**Quick Install (Recommended)**
```bash
curl -L https://foundry.paradigm.xyz | bash
foundryup
```
**Verify Installation:**
```bash
anvil --version
cast --version
forge --version
```
## Setup
1. **Install Python dependencies:**
```bash
pip install openai pydantic PyYAML
```
2. **Set OpenAI API key:**
```bash
export OPENAI_API_KEY="your-api-key-here"
```
3. **Verify configuration:**
```bash
python -c "from anvil import AnvilConfig; config = AnvilConfig(); print('Config loaded successfully')"
```
## Usage
### Running the Environment
**For inference-only rollouts:**
```bash
cd environments/community/ethereum_virtual_machine/
python evm_server.py process \
--env.data_path_to_save_groups evm_rollouts.jsonl \
--openai.model_name gpt-4o-mini
```
**For full training with server:**
```bash
python evm_server.py serve
```
### Configuration
The environment uses `configs/token_transfers.yaml` for blockchain configuration:
- **Network Settings**: Port (8545), chain ID, block time
- **Fork Configuration**: Mainnet fork at specific block
- **Wallet Setup**: Custom wallet funding and token swaps
- **Gas Settings**: Limit and price configuration
- **Token Addresses**: Whitelisted ERC-20 tokens
## Potential Training Applications
- **DeFi Agent Development**: Training models for decentralized finance interactions
- **Transaction Automation**: Building agents for routine blockchain operations
- **Smart Contract Interaction**: Learning to encode function calls and parameters
- **Risk Assessment**: Understanding transaction costs and failure modes
- **Multi-Chain Operations**: Foundation for cross-chain transaction agents

View file

@ -0,0 +1,765 @@
"""Anvil blockchain simulation backend with integrated configuration.
This module provides a complete interface for managing Anvil (Foundry's local Ethereum node)
with integrated YAML configuration loading.
"""
from __future__ import annotations
import atexit
import logging
import signal
import subprocess
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
import requests
import yaml
# Set up anvil logger to write to anvil.log
anvil_logger = logging.getLogger("anvil")
anvil_logger.setLevel(logging.INFO)
anvil_logger.propagate = False
# Create file handler for anvil.log
if not anvil_logger.handlers:
file_handler = logging.FileHandler("anvil.log")
file_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
file_handler.setFormatter(file_formatter)
anvil_logger.addHandler(file_handler)
class ConfigDict:
"""Helper class to provide dot-notation access to configuration dictionaries."""
def __init__(self, data: Dict[str, Any]):
for key, value in data.items():
if isinstance(value, dict):
setattr(self, key, ConfigDict(value))
else:
setattr(self, key, value)
def __getitem__(self, key):
return getattr(self, key)
def __contains__(self, key):
return hasattr(self, key)
def get(self, key, default=None):
return getattr(self, key, default)
class AnvilConfig:
"""Configuration loader for Anvil EVM environment."""
def __init__(self, config_file: str = "configs/token_transfers.yaml"):
self.config_file = Path(__file__).parent / config_file
self._raw_config = self._load_config()
# Create dot-notation accessible config sections
self.anvil = ConfigDict(
self._raw_config.get("network", {})
) # Renamed from 'network' to 'anvil'
self.timeouts = ConfigDict(self._raw_config.get("timeouts", {}))
self.funding = ConfigDict(self._raw_config.get("funding", {}))
self.whitelisted_tokens = ConfigDict(
self._raw_config.get("whitelisted_tokens", {})
)
self.defi = ConfigDict(self._raw_config.get("defi", {}))
self.swaps = ConfigDict(self._raw_config.get("swaps", {}))
def _load_config(self) -> Dict[str, Any]:
"""Load configuration from YAML file."""
try:
with open(self.config_file, "r") as f:
config = yaml.safe_load(f)
return config
except FileNotFoundError:
raise FileNotFoundError(f"Configuration file not found: {self.config_file}")
except yaml.YAMLError as e:
raise ValueError(f"Error parsing configuration file: {e}")
# Helper Methods
def get_rpc_url(self) -> str:
"""Get the full RPC URL for the Anvil instance."""
return f"http://127.0.0.1:{self.anvil.port}"
def get_anvil_startup_command(
self, port: int = None, fork_url: str = None
) -> list[str]:
"""Get the Anvil startup command with specified or default parameters."""
cmd = ["anvil", "--port", str(port or self.anvil.port)]
if fork_url or self.anvil.fork_url:
cmd += ["--fork-url", fork_url or self.anvil.fork_url]
return cmd
class AnvilBackend:
"""Anvil-specific blockchain simulation backend."""
def __init__(
self,
config: AnvilConfig,
port: Optional[int] = None,
fork_url: Optional[str] = None,
log_file: Optional[str] = None,
) -> None:
self.config = config
self.port = port or config.anvil.port
self.fork_url = fork_url or config.anvil.fork_url
self.log_file = log_file or config.anvil.log_file
self._proc: Optional[subprocess.Popen[str]] = None
self._is_wallet_setup = False
self.rpc_url = f"http://127.0.0.1:{self.port}"
# Register cleanup handlers
self._setup_cleanup_handlers()
def _setup_cleanup_handlers(self):
"""Setup cleanup handlers for various exit scenarios"""
# Register cleanup function to run on normal exit
atexit.register(self._cleanup_process)
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGINT, self._signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, self._signal_handler) # Termination signal
# On Windows, also handle SIGBREAK
if hasattr(signal, "SIGBREAK"):
signal.signal(signal.SIGBREAK, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle shutdown signals gracefully"""
anvil_logger.info(
f"Received signal {signum}, shutting down Anvil gracefully..."
)
self._cleanup_process()
def _cleanup_process(self):
"""Clean up Anvil process"""
if self._proc and self._proc.poll() is None:
try:
anvil_logger.info("Terminating Anvil process...")
self._proc.terminate()
try:
self._proc.wait(timeout=5)
anvil_logger.info("Anvil process terminated gracefully")
except subprocess.TimeoutExpired:
anvil_logger.warning(
"Anvil didn't terminate gracefully, killing process"
)
self._proc.kill()
self._proc.wait()
anvil_logger.info("Anvil process killed")
except Exception as e:
anvil_logger.error(f"Error during Anvil cleanup: {e}")
finally:
self._proc = None
def start(self) -> None:
"""Start the Anvil process."""
if self._proc is not None and self._proc.poll() is None:
anvil_logger.info("Anvil is already running")
return # already running
cmd = self.config.get_anvil_startup_command(self.port, self.fork_url)
# Open log file for anvil output
log_path = Path(self.log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
with open(log_path, "w") as log_f:
log_f.write(f"=== Anvil started at port {self.port} ===\n")
log_f.write(f"Command: {' '.join(cmd)}\n")
log_f.write("=" * 50 + "\n")
# spawn detached so we can ctrl-c main program without killing anvil
self._proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
# wait until RPC ready and log output
started = False
with open(log_path, "a") as log_f:
for i in range(self.config.timeouts.anvil_startup_lines):
line = self._proc.stdout.readline() # type: ignore
if line:
log_f.write(line)
log_f.flush() # Ensure immediate write
if "Listening on" in line or "JSON-RPC server started" in line:
started = True
break
else:
# No more output, break early
break
if not started:
anvil_logger.error("Failed to launch anvil; did you run the setup script?")
raise RuntimeError("Failed to launch anvil; did you run the setup script?")
def stop(self) -> None:
"""Stop the Anvil process."""
self._cleanup_process()
def get_rpc_url(self) -> str:
"""Get the RPC URL for this Anvil instance."""
return self.rpc_url
def execute_transaction(self, tx_obj: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute transaction using cast command.
Args:
tx_obj: Transaction object from agent (e.g., {"to": "0x...", "value": "0.5", "data": "0x"})
Returns:
Dict with success, gas_used, output, tx_hash, error
"""
try:
# Extract transaction fields
to_address = tx_obj.get("to", "")
value = tx_obj.get("value", "0")
data = tx_obj.get("data", "0x")
# Convert hex value to decimal for cast
if isinstance(value, str) and value.startswith("0x"):
try:
value_decimal = str(int(value, 16))
except ValueError:
value_decimal = "0"
else:
value_decimal = str(value)
# Build cast command - different approaches based on whether we have data
if data and data != "0x" and len(data) > 2:
# Transaction with data (contract interaction) - pass raw hex data as sig parameter
cmd = [
"cast",
"send",
to_address,
data, # Raw hex data as the sig parameter (selector + encoded calldata)
"--from",
self.config.funding.custom_wallet,
"--unlocked",
"--value",
value_decimal,
"--rpc-url",
self.get_rpc_url(),
]
else:
# Simple ETH transfer
cmd = [
"cast",
"send",
to_address,
"--from",
self.config.funding.custom_wallet,
"--unlocked",
"--value",
value_decimal,
"--rpc-url",
self.get_rpc_url(),
]
# Execute cast command
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=self.config.timeouts.cast_command,
)
# Parse result
if result.returncode == 0:
# Success - extract transaction hash and get receipt
tx_hash = result.stdout.strip()
gas_used = self._get_gas_used(tx_hash)
return {
"success": True,
"status": "0x1", # Success status for scoring
"gas_used": gas_used,
"tx_hash": tx_hash,
"output": result.stdout,
}
else:
# Failure - parse error
error_msg = result.stderr.strip() or result.stdout.strip()
return {
"success": False,
"status": "0x0", # Failure status for scoring
"gas_used": 0,
"error": error_msg,
"output": result.stderr + result.stdout,
}
except subprocess.TimeoutExpired:
return {
"success": False,
"status": "0x0",
"gas_used": 0,
"error": "Transaction timeout",
"output": "cast command timed out",
}
except Exception as e:
anvil_logger.error(f"Exception in execute_transaction: {str(e)}")
return {
"success": False,
"status": "0x0",
"gas_used": 0,
"error": str(e),
"output": f"Failed to execute cast: {str(e)}",
}
def setup_wallet(self, wallet_address: Optional[str] = None) -> None:
"""Setup custom wallet by impersonating it and funding with ETH."""
if self._is_wallet_setup:
return # Already setup
wallet = wallet_address or self.config.funding.custom_wallet
try:
# Impersonate the custom wallet using cast command
result = subprocess.run(
[
"cast",
"rpc",
"anvil_impersonateAccount",
wallet,
"--rpc-url",
self.get_rpc_url(),
],
capture_output=True,
text=True,
timeout=self.config.timeouts.cast_command,
)
if result.returncode != 0:
anvil_logger.error(f"Failed to impersonate wallet: {result.stderr}")
raise RuntimeError(f"Failed to impersonate wallet: {result.stderr}")
# Add buffer time
time.sleep(self.config.timeouts.wallet_setup_buffer)
# Fund the custom wallet with ETH from Anvil account 0
result = subprocess.run(
[
"cast",
"send",
wallet,
"--private-key",
self.config.funding.anvil_private_key_0,
"--value",
self.config.funding.initial_funding_amount,
"--rpc-url",
self.get_rpc_url(),
],
capture_output=True,
text=True,
timeout=self.config.timeouts.cast_command,
)
if result.returncode != 0:
anvil_logger.error(f"Failed to fund custom wallet: {result.stderr}")
raise RuntimeError(f"Failed to fund custom wallet: {result.stderr}")
# Add buffer time before starting swaps
time.sleep(self.config.timeouts.wallet_setup_buffer)
# Perform initial token swaps to diversify the wallet
self._perform_initial_swaps()
self._is_wallet_setup = True
except Exception as e:
anvil_logger.error(f"Error setting up custom wallet: {str(e)}")
raise
def snapshot(self) -> str:
"""Take a snapshot of the current blockchain state."""
return self._rpc("evm_snapshot")
def revert(self, snap_id: str) -> None:
"""Revert to a previous snapshot."""
self._rpc("evm_revert", [snap_id])
# Private helper methods
def _rpc(self, method: str, params: Optional[List[Any]] = None) -> Any:
"""Make an RPC call to Anvil."""
import json as _json
from urllib import request
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params or [],
}
data = _json.dumps(payload).encode()
req = request.Request(
self.get_rpc_url(), data=data, headers={"Content-Type": "application/json"}
)
resp = request.urlopen(req)
result = _json.loads(resp.read())
if "error" in result:
raise RuntimeError(result["error"])
return result["result"]
def _get_gas_used(self, tx_hash: str) -> int:
"""Get gas used from transaction receipt using cast."""
try:
result = subprocess.run(
[
"cast",
"receipt",
tx_hash,
"--field",
"gasUsed",
"--rpc-url",
self.get_rpc_url(),
],
capture_output=True,
text=True,
timeout=self.config.timeouts.cast_command,
)
if result.returncode == 0:
return int(result.stdout.strip(), 16) # Convert hex to int
except Exception:
pass
return 0 # Default if we can't get gas info
def _perform_initial_swaps(self):
"""Perform initial token swaps to give the wallet a diverse portfolio."""
# Get token configuration from config
tokens = self.config.whitelisted_tokens
# Amount to swap for each token
swap_amount = self.config.swaps.initial_swap_amount
# Swap for all whitelisted tokens from config
for token_name in tokens.__dict__.keys():
try:
token_info = getattr(tokens, token_name)
# Try direct RPC approach
success = self._execute_swap_direct(
token_name, token_info.address, swap_amount
)
if success:
# Check token balance after swap
self._check_token_balance(
token_name, token_info.address, token_info.decimals
)
# Add buffer between swaps
time.sleep(self.config.timeouts.operation_buffer)
except Exception as e:
anvil_logger.warning(f"Error swapping ETH for {token_name}: {str(e)}")
continue
def _check_token_balance(self, token_name: str, token_address: str, decimals: int):
"""Check and log the balance of a specific token."""
try:
balance_result = subprocess.run(
[
"cast",
"call",
token_address,
"balanceOf(address)(uint256)",
self.config.funding.custom_wallet,
"--rpc-url",
self.get_rpc_url(),
],
capture_output=True,
text=True,
timeout=self.config.timeouts.cast_command,
)
if balance_result.returncode == 0:
balance_output = balance_result.stdout.strip()
if balance_output:
# Parse the balance - cast call returns decimal, not hex
# Handle format like "26432331438 [2.643e10]"
balance_str = balance_output.split()[
0
] # Take first part before any brackets
balance_raw = int(balance_str)
balance_formatted = balance_raw / (10**decimals)
anvil_logger.info(
f"{token_name} balance: {balance_formatted:.6f} {token_name}"
)
else:
anvil_logger.warning(
f"Empty response when checking {token_name} balance"
)
else:
anvil_logger.warning(f"Failed to check {token_name} balance")
except Exception as e:
anvil_logger.warning(f"Error checking {token_name} balance: {str(e)}")
def _direct_rpc_call(
self, method: str, params: Optional[List] = None
) -> Dict[str, Any]:
"""Make a direct RPC call to Anvil using HTTP requests."""
try:
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params or [],
}
response = requests.post(
self.get_rpc_url(),
json=payload,
timeout=self.config.timeouts.rpc,
headers={"Content-Type": "application/json"},
)
if response.status_code == 200:
result = response.json()
if "error" in result:
return {"success": False, "error": result["error"]}
else:
return {"success": True, "result": result.get("result")}
else:
return {
"success": False,
"error": f"HTTP {response.status_code}: {response.text}",
}
except requests.exceptions.Timeout:
return {"success": False, "error": "RPC timeout"}
except Exception as e:
return {"success": False, "error": str(e)}
def _execute_swap_direct(
self, token_name: str, token_address: str, swap_amount: str
) -> bool:
"""Execute swap using direct RPC calls instead of subprocess."""
try:
# Get configuration values
uniswap_router = self.config.defi.uniswap_v3_router
weth_address = self.config.defi.weth_address
# Create deadline
deadline = hex(
int(time.time()) + self.config.timeouts.transaction_deadline_offset
)
# Function selector for exactInputSingle
function_selector = self.config.swaps.uniswap_exact_input_single_selector
# Convert addresses to 32-byte hex (pad with zeros)
token_in_padded = weth_address.lower().replace("0x", "").zfill(64)
token_out_padded = token_address.lower().replace("0x", "").zfill(64)
fee_padded = hex(self.config.defi.default_uniswap_fee)[2:].zfill(64)
recipient_padded = (
self.config.funding.custom_wallet.lower().replace("0x", "").zfill(64)
)
deadline_padded = deadline[2:].zfill(64)
amount_in_padded = hex(int(swap_amount))[2:].zfill(64)
amount_out_min_padded = "0".zfill(64) # 0 minimum out
sqrt_price_limit_padded = "0".zfill(64) # 0 price limit
# Construct the full calldata
calldata = (
function_selector
+ token_in_padded
+ token_out_padded
+ fee_padded
+ recipient_padded
+ deadline_padded
+ amount_in_padded
+ amount_out_min_padded
+ sqrt_price_limit_padded
)
# Prepare transaction parameters
tx_params = {
"from": self.config.funding.custom_wallet.lower(),
"to": uniswap_router.lower(),
"value": hex(int(swap_amount)),
"data": calldata,
}
# Send the transaction via RPC
result = self._direct_rpc_call("eth_sendTransaction", [tx_params])
if result["success"]:
tx_hash = result["result"]
# Mine a block to include the transaction (Anvil in fork mode doesn't auto-mine)
mine_result = self._direct_rpc_call("evm_mine")
if not mine_result["success"]:
return False
# Check the transaction receipt
receipt_result = self._direct_rpc_call(
"eth_getTransactionReceipt", [tx_hash]
)
if receipt_result["success"] and receipt_result["result"]:
receipt = receipt_result["result"]
if receipt.get("status") == "0x1":
return True
else:
return False
else:
return False
else:
return False
except Exception as e:
anvil_logger.warning(f"Error in {token_name} swap: {str(e)}")
return False
def get_wallet_balances(
self, wallet_address: Optional[str] = None, tokens: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Get wallet balances for specified tokens or default set.
Args:
wallet_address: Address to check balances for (defaults to custom wallet)
tokens: List of token symbols to check (defaults to ETH + whitelisted tokens)
Returns:
Dict with token symbols as keys and balance info as values
"""
wallet = wallet_address or self.config.funding.custom_wallet
# Default to ETH + whitelisted tokens if none specified
if tokens is None:
tokens = ["ETH"] + list(self.config.whitelisted_tokens.__dict__.keys())
balances = {}
for token_symbol in tokens:
try:
if token_symbol.upper() == "ETH":
# Get ETH balance using RPC call
result = self._direct_rpc_call("eth_getBalance", [wallet, "latest"])
if result["success"]:
balance_wei = int(result["result"], 16) # Convert hex to int
balance_eth = balance_wei / 10**18
balances["ETH"] = {
"symbol": "ETH",
"balance": balance_eth,
"balance_wei": str(balance_wei),
"decimals": 18,
}
else:
balances["ETH"] = {
"symbol": "ETH",
"balance": 0,
"error": result.get("error", "Unknown error"),
}
else:
# Get ERC-20 token balance using existing token check pattern
token_info = getattr(
self.config.whitelisted_tokens, token_symbol, None
)
if token_info is None:
balances[token_symbol] = {
"symbol": token_symbol,
"balance": 0,
"error": "Token not found in config",
}
continue
# Use existing cast command execution pattern
balance_result = self._execute_cast_command(
[
"cast",
"call",
token_info.address,
"balanceOf(address)(uint256)",
wallet,
"--rpc-url",
self.get_rpc_url(),
]
)
if balance_result["success"]:
balance_output = balance_result["output"].strip()
if balance_output:
# Parse the balance using existing pattern from _check_token_balance
balance_str = balance_output.split()[
0
] # Take first part before any brackets
balance_raw = int(balance_str)
balance_formatted = balance_raw / (10**token_info.decimals)
balances[token_symbol] = {
"symbol": token_symbol,
"balance": balance_formatted,
"balance_raw": balance_raw,
"decimals": token_info.decimals,
"address": token_info.address,
}
else:
balances[token_symbol] = {
"symbol": token_symbol,
"balance": 0,
"error": "Empty balance response",
}
else:
balances[token_symbol] = {
"symbol": token_symbol,
"balance": 0,
"error": balance_result.get("error", "Cast command failed"),
}
except Exception as e:
balances[token_symbol] = {
"symbol": token_symbol,
"balance": 0,
"error": str(e),
}
anvil_logger.error(
f"Exception getting {token_symbol} balance: {str(e)}"
)
return balances
def _execute_cast_command(self, cmd: List[str]) -> Dict[str, Any]:
"""
Execute a cast command and return standardized result.
Reuses existing patterns for cast command execution.
"""
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=self.config.timeouts.cast_command,
)
if result.returncode == 0:
return {"success": True, "output": result.stdout, "error": None}
else:
return {
"success": False,
"output": result.stdout,
"error": result.stderr.strip() or result.stdout.strip(),
}
except subprocess.TimeoutExpired:
return {"success": False, "output": "", "error": "Command timeout"}
except Exception as e:
return {"success": False, "output": "", "error": str(e)}
def __enter__(self):
"""Context manager entry"""
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.stop()

View file

@ -0,0 +1,109 @@
# Anvil Configuration
# ============================================================================
# ANVIL NETWORK CONFIGURATION
# ============================================================================
network:
port: 8545
fork_url: "https://reth-ethereum.ithaca.xyz/rpc"
log_file: "anvil.log"
# ============================================================================
# TIMEOUT CONFIGURATION
# ============================================================================
timeouts:
# Subprocess timeout for cast commands (seconds)
cast_command: 30
# RPC request timeout (seconds)
rpc: 30
# Anvil startup timeout (number of output lines to read)
anvil_startup_lines: 100
# Anvil shutdown timeout (seconds)
anvil_shutdown: 5
# Buffer time between operations (seconds)
operation_buffer: 2
wallet_setup_buffer: 3
# Transaction deadline offset (seconds from now)
transaction_deadline_offset: 3600 # 1 hour
# ============================================================================
# WALLET SEED FUNDING
# ============================================================================
funding:
# Anvil seed account (Available Account 0) - used for funding custom wallet
anvil_account_0: "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266"
anvil_private_key_0: "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"
custom_wallet: "0xcA4B9c26111Aacf982d85c4DE1bEBB7AeD2ffaa7"
initial_funding_amount: "1000000000000000000000" # 1000 ETH
# ============================================================================
# WHITELISTED TOKENS
# ============================================================================
whitelisted_tokens:
USDC:
address: "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"
decimals: 6
USDT:
address: "0xdAC17F958D2ee523a2206206994597C13D831ec7"
decimals: 6
WBTC:
address: "0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599"
decimals: 8
DAI:
address: "0x6B175474E89094C44Da98b954EedeAC495271d0F"
decimals: 18
LINK:
address: "0x514910771AF9Ca656af840dff83E8264EcF986CA"
decimals: 18
CRV:
address: "0xD533a949740bb3306d119CC777fa900bA034cd52"
decimals: 18
UNI:
address: "0x1f9840a85d5aF5bf1D1762F925BDADdC4201F984"
decimals: 18
LDO:
address: "0x5A98FcBEA516Cf06857215779Fd812CA3beF1B32"
decimals: 18
# ============================================================================
# DEFI PROTOCOL ADDRESSES
# ============================================================================
defi:
# Uniswap V3 Router address
uniswap_v3_router: "0xE592427A0AEce92De3Edee1F18E0157C05861564"
# WETH address
weth_address: "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
# Default Uniswap fee tier (0.3%)
default_uniswap_fee: 3000
# ============================================================================
# SWAP CONFIGURATION
# ============================================================================
swaps:
# Amount to swap for each token during initial setup (10 ETH in wei)
initial_swap_amount: "10000000000000000000"
# Function selector for Uniswap V3 exactInputSingle
uniswap_exact_input_single_selector: "0x414bf389"
# ============================================================================
# EVM ENVIRONMENT CONFIGURATION
# ============================================================================
# These values override the defaults in evm_config.py
# Question generation LLM settings
question_generation_model: "gpt-4o-mini"
question_generation_temperature: 0.6
question_generation_max_tokens: 256
question_generation_n: 3
# Question selection strategy
weak_performance_threshold: 0.9
weak_area_focus_ratio: 0.8

View file

@ -0,0 +1,66 @@
from typing import List
from pydantic import Field
from pydantic_settings import BaseSettings
from atroposlib.envs.base import BaseEnvConfig
class EVMEnvConfig(BaseEnvConfig, BaseSettings):
"""Configuration for the EVM Environment"""
# Logging configuration
debug_logging: bool = Field(
default=False, description="Enable detailed debug logging"
)
suppress_base_env_logs: bool = Field(
default=True,
description="Suppress base environment INFO logs to reduce noise",
)
# Anvil configuration
anvil_config_path: str = Field(
"configs/token_transfers.yaml",
description="Path to Anvil configuration YAML file",
)
max_steps: int = Field(1, description="Only one step per transaction episode")
question_types: List[str] = Field(
default=[
"ETH transfer",
"ERC-20 transfer using 18 decimal token",
"ERC-20 transfer using a non-18 decimal token",
],
description="Types of questions to generate for the agent",
)
# Question selection strategy configuration
weak_performance_threshold: float = Field(
default=0.9,
description="Performance threshold below which question types are considered weak (0.0-1.0)",
)
weak_area_focus_ratio: float = Field(
default=0.8,
description="Probability of focusing on weak areas vs strong areas (0.0-1.0)",
)
# LLM generation configuration for dynamic questions
question_generation_model: str = Field(
default="gpt-4o-mini",
description="Model to use for generating dynamic questions",
)
question_generation_temperature: float = Field(
default=0.6,
description="Temperature for question generation (0.0-2.0)",
)
question_generation_max_tokens: int = Field(
default=256,
description="Maximum tokens for question generation",
)
question_generation_n: int = Field(
default=3,
description="Number of responses to generate per question generation call",
)
class Config:
env_file = "configs/token_transfers.yaml"
env_file_encoding = "utf-8"

View file

@ -0,0 +1,779 @@
#!/usr/bin/env python3
"""
EVM Environment for Atropos: Ethereum Virtual Machine Transaction Agent Training
This environment trains language models to generate and execute profitable Ethereum transactions
using Anvil (Foundry's local blockchain simulation).
"""
import json
import logging
import os
import random
import re
import sys
import traceback
from typing import Any, Dict, List, Optional, Tuple
from anvil import AnvilBackend, AnvilConfig
from evm_config import EVMEnvConfig
from openai import OpenAI
from utils import cleanup_blockchain, cleanup_manager, setup_evm_error_message
from atroposlib.envs.base import BaseEnv, ScoredDataGroup
from atroposlib.envs.server_handling.server_manager import APIServerConfig
from atroposlib.type_definitions import Item
from atroposlib.utils.tokenize_for_trainer import tokenize_for_trainer
# Add logger
logger = logging.getLogger(__name__)
# System prompt for EVM transaction agent
system_prompt = (
"You are a deep thinking AI, you may use extremely long chains of thought "
"to deeply consider the problem and deliberate with yourself via systematic "
"reasoning processes to help come to a correct solution prior to answering. "
"You should enclose your thoughts and internal monologue inside <think> </think> "
"tags, and then provide your solution or response to the problem.\n\n"
)
system_prompt += """You are allowed to use a maximum of 2048 tokens. Please strive to use less.
You are here to assist a user execute transfers of both ETH and ERC-20 tokens as requested.
Your job is to generate correct Ethereum transaction data for the requested action.
IMPORTANT: After your thinking, your response must include a valid JSON transaction object:
{"to": "0x...", "value": "amount_in_wei", "data": "0x..."}
- 'to': The recipient address (contract or EOA)
- 'value': Amount of ETH to send in wei (string)
- 'data': Transaction data
If you do not provide a valid JSON transaction object, your submission will be ignored and you \
will receive a score of -1.0.
Example 1:
{
"to": "0xe688b84b23f322a994A53dbF8E15FA82CDB71127",
"value": "0.01",
"data": "0x"
}
Example 2:
{
"to": "0xEA29e9da69317d80075fBfc836E843C6d65971F5",
"value": "0x",
"data": "0xa9059cbb000000000000000000000000ea29e9da69317d80075fbfc836e843c6d65971f50000000000000000000000000000000000000000000000000000000005f5e100" # noqa: E501
}
"""
class EVMEnv(BaseEnv):
"""EVM Transaction Environment for training agents to interact with Ethereum"""
name = "evm_agent"
env_config_cls = EVMEnvConfig
def __init__(
self,
config: EVMEnvConfig,
server_configs: List[APIServerConfig],
slurm=True,
testing=False,
):
"""Initialize the EVM environment"""
super().__init__(config, server_configs, slurm, testing)
# Set up minimal logging - only for essential operations
self.logger = logging.getLogger(f"{self.__class__.__name__}")
self.logger.setLevel(logging.WARNING) # Only warnings and errors
self.logger.propagate = False
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter("%(message)s") # Clean format
handler.setFormatter(formatter)
self.logger.addHandler(handler)
# Suppress base environment logs
if config.suppress_base_env_logs:
base_logger = logging.getLogger("atroposlib.envs.base")
base_logger.setLevel(logging.WARNING)
# Load Anvil configuration
self.anvil_config = AnvilConfig(config.anvil_config_path)
# Initialize blockchain handler
self.blockchain = AnvilBackend(self.anvil_config)
# Performance tracking for adaptive question selection
self.question_performance = {qtype: [] for qtype in config.question_types}
self.current_question_type = None
# Store current prompt data for scoring
self.current_prompt_data = None
# Register cleanup with the global cleanup manager
cleanup_manager.register_cleanup(cleanup_blockchain, self.blockchain)
async def setup(self):
"""Setup the EVM environment and start Anvil"""
try:
print("Starting Anvil blockchain simulation...")
self.blockchain.start()
self.blockchain.setup_wallet()
print("EVM environment setup completed successfully.")
except Exception as e:
error_message = setup_evm_error_message(self.anvil_config, e)
print(error_message)
# Cleanup and exit
cleanup_blockchain(self.blockchain)
sys.exit(1)
async def get_next_item(self) -> Optional[Item]:
"""Generate the next transaction challenge for the agent"""
try:
# Select question type based on performance (exploration vs exploitation)
question_type = self._select_question_type()
self.current_question_type = question_type
# Generate question prompt and get structured data
prompt_text, prompt_data = await self._generate_question_prompt(
question_type
)
# Store the prompt data for scoring
self.current_prompt_data = prompt_data
# Display Generated Input
self.logger.debug("\n=== Generated Input ===")
self.logger.debug(prompt_text)
self.logger.debug("=" * 50)
prompt = tuple(
[frozenset({"role": "user", "content": prompt_text}.items())]
)
return (prompt, None, None)
except Exception as e:
print(f"Error in get_next_item: {e}")
traceback.print_exc()
return None
def _select_question_type(self) -> str:
"""Select question type using weakness-targeting strategy with 80/20 ratio"""
# If no performance data yet, select randomly
if not any(self.question_performance.values()):
return random.choice(self.config.question_types)
# Calculate average scores for each question type
avg_scores = {}
for qtype, scores in self.question_performance.items():
if scores:
avg_scores[qtype] = sum(scores) / len(scores)
else:
avg_scores[qtype] = 0.0 # Prioritize untested question types
# Split into weak and strong areas based on configurable performance threshold
weak_threshold = self.config.weak_performance_threshold
weak_qtypes = [
qtype for qtype, score in avg_scores.items() if score < weak_threshold
]
strong_qtypes = [
qtype for qtype, score in avg_scores.items() if score >= weak_threshold
]
# Configurable focus on weak areas vs strong areas for mastery maintenance
if random.random() < self.config.weak_area_focus_ratio and weak_qtypes:
selected_type = random.choice(weak_qtypes)
elif strong_qtypes:
selected_type = random.choice(strong_qtypes)
else:
selected_type = random.choice(list(avg_scores.keys()))
return selected_type
async def _generate_question_prompt(
self, question_type: str
) -> Tuple[str, Optional[Dict[str, Any]]]:
"""Generate a dynamic question prompt using LLM based on the question type"""
# Initialize OpenAI client
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
# Create prompt for LLM to generate a request
llm_prompt = f"""You are generating a natural language transaction request for an Ethereum blockchain agent.
TRANSACTION TYPE: "{question_type}"
CONTEXT:
- Wallet Address: {self.anvil_config.funding.custom_wallet}
- Current Balances: {json.dumps(self.blockchain.get_wallet_balances(), indent=2)}
TASK:
Generate a realistic, conversational transaction request that:
1. Matches the specified transaction type exactly
2. Does not use more than current wallet balances and typically would be small transfers or possibly larger,
like how a real person may use their assets
3. Includes all necessary details (token, amount, destination address)
4. Sounds like how a real user would naturally request a transaction
5. Varies in tone and style (casual, formal, urgent, etc.)
REQUIREMENTS:
- Use realistic destination addresses (not placeholder text like
"0x1234567890123456789012345678901234567890")
- Does not specify amounts larger than 50% of the current balance
- Make the request executable
Generate ONE natural language request that matches the transaction type "{question_type}".
Respond with a JSON object with the following fields:
- question_type: The type of transaction to generate
- request: The natural language request text
- destination_address: The destination address
- transfer_token: The token to transfer
- transfer_amount: The amount to transfer
Examples:
1. ETH transfer
{{
"question_type": "ETH transfer",
"request": "yo, can i send 0.01 ETH to my buddy jasper? His address is jasper.eth"
"destination_address": "jasper.eth"
"transfer_token": "ETH"
"transfer_amount": "0.01"
}}
2. ERC-20 transfer using 18 decimal token
{{
"question_type": "ERC-20 transfer using 18 decimal token",
"request": "Send 100 CRV to 0xe688b84b23f322a994A53dbF8E15FA82CDB71127"
"destination_address": "0xe688b84b23f322a994A53dbF8E15FA82CDB71127"
"transfer_token": "CRV"
"transfer_amount": "100"
}}
3. ERC-20 transfer using a non-18 decimal token (e.g. USDT)
{{
"question_type": "ERC-20 transfer using a non-18 decimal token",
"request": "give 100 tether to 0xea29e9da69317d80075fbfc836e843c6d65971f5"
"destination_address": "0xea29e9da69317d80075fbfc836e843c6d65971f5"
"transfer_token": "USDT"
"transfer_amount": "100"
}}
"""
try:
# Generate multiple responses in a single call for efficiency
response = client.chat.completions.create(
model=self.config.question_generation_model,
messages=[{"role": "user", "content": llm_prompt}],
temperature=self.config.question_generation_temperature,
max_tokens=self.config.question_generation_max_tokens,
n=self.config.question_generation_n,
)
# Try each response until we find a valid one
for i, choice in enumerate(response.choices):
generated_content = choice.message.content.strip()
# Extract JSON from response using generic function
prompt_data = self._extract_json_from_response(
generated_content, ["question_type", "request"], "prompt"
)
# Validate required fields
if prompt_data and self._validate_prompt_data(
prompt_data, question_type
):
return prompt_data["request"], prompt_data
# All choices failed, use fallback
fallback_data = {
"question_type": question_type,
"request": "Transfer 0.01 ETH to 0x0000000000000000000000000000000000000000",
"destination_address": "0x0000000000000000000000000000000000000000",
"transfer_token": "ETH",
"transfer_amount": "0.01",
}
return fallback_data["request"], fallback_data
except Exception:
fallback_data = {
"question_type": question_type,
"request": "Transfer 0.01 ETH to 0x0000000000000000000000000000000000000000",
"destination_address": "0x0000000000000000000000000000000000000000",
"transfer_token": "ETH",
"transfer_amount": "0.01",
}
return fallback_data["request"], fallback_data
def _extract_json_from_response(
self, response: str, required_keys: List[str], json_type: str = "JSON"
) -> Optional[Dict[str, Any]]:
"""Generic JSON extraction from LLM response, handling thinking tags"""
if not isinstance(response, str):
return None
# First, try to extract content after thinking tags (following SWE pattern)
content_after_think = response
think_end_match = re.search(r"</think>", response, re.IGNORECASE)
if think_end_match:
content_after_think = response[think_end_match.end() :].strip()
# Create patterns based on required keys
if len(required_keys) >= 2:
key1, key2 = required_keys[0], required_keys[1]
json_patterns = [
rf'\{{[^{{}}]*"{key1}"[^{{}}]*"{key2}"[^{{}}]*\}}', # Simple pattern with first two keys
rf'\{{.*?"{key1}".*?"{key2}".*?\}}', # Flexible pattern with first two keys
r"\{.*?\}", # Any JSON object
]
else:
json_patterns = [r"\{.*?\}"] # Fallback to any JSON
for pattern in json_patterns:
matches = re.findall(pattern, content_after_think, re.DOTALL)
for match in matches:
try:
# Clean up the JSON string
json_str = match.strip()
# Parse the JSON
obj = json.loads(json_str)
# Verify it has the expected structure
if isinstance(obj, dict) and required_keys[0] in obj:
return obj
except json.JSONDecodeError:
continue
return None
def _extract_transaction_json(self, response: str) -> Optional[Dict[str, Any]]:
"""Extract transaction JSON from LLM response"""
return self._extract_json_from_response(
response, ["to", "value"], "transaction"
)
def _validate_prompt_data(
self, prompt_data: Dict[str, Any], expected_question_type: str
) -> bool:
"""Validate that prompt data has all required fields and correct question type"""
required_fields = [
"question_type",
"request",
"destination_address",
"transfer_token",
"transfer_amount",
]
# Check all required fields are present
if not all(field in prompt_data for field in required_fields):
return False
# Check question type matches what we requested
if prompt_data["question_type"] != expected_question_type:
return False
# Check that fields are not empty
for field in required_fields:
if not prompt_data[field] or str(prompt_data[field]).strip() == "":
return False
return True
async def collect_trajectories(
self, item: Item
) -> Tuple[Optional[ScoredDataGroup], List[Item]]:
"""Collect trajectories by having the agent generate transactions"""
to_score = []
to_backlog = []
system_msg = {
"role": "system",
"content": system_prompt,
}
user_msg = {"role": "user", "content": dict(item[0][0])["content"]}
messages = [system_msg, user_msg]
try:
# Use proper Atropos framework pattern like humor generation
chat_completions = await self.server.chat_completion(
messages=messages,
n=self.config.group_size,
max_tokens=2048,
)
# Store completions for output saving
self.last_completions = []
for i, choice in enumerate(chat_completions.choices):
# Store the completion
self.last_completions.append(choice.message.content)
# Display Generated Output
self.logger.debug(f"\n=== Generated Output {i+1} ===")
self.logger.debug(choice.message.content)
self.logger.debug("=" * 50)
history = [
{"role": "system", "content": system_msg["content"]},
{"role": "user", "content": user_msg["content"]},
{"role": "assistant", "content": choice.message.content},
]
to_score.append((history, item[1], None))
except Exception as e:
print(f"Error in collect_trajectories: {e}")
traceback.print_exc()
to_backlog.append(item)
if not to_score:
return None, to_backlog
scored_data = await self.score(to_score)
return scored_data, to_backlog
async def score(self, rollout_group_data) -> Optional[ScoredDataGroup]:
"""Score the generated transactions by executing them on Anvil"""
if not rollout_group_data:
return None
scores = ScoredDataGroup()
scores["tokens"] = []
scores["masks"] = []
scores["scores"] = []
scores["advantages"] = None
scores["ref_logprobs"] = None
scores["messages"] = None
scores["group_overrides"] = {"group_size": self.config.group_size}
scores["overrides"] = None
scores["ground_truths"] = []
for i, item in enumerate(rollout_group_data):
out = tokenize_for_trainer(self.tokenizer, item[0])
tokens = out["tokens"]
masks = out["masks"]
try:
# Extract the agent's response (transaction JSON)
agent_response = item[0][-1]["content"].strip()
ground_truth = item[1] if isinstance(item[1], str) else ""
# Score the transaction
score = await self._score_transaction(agent_response)
# Display Score
self.logger.debug(f"\n=== Score {i+1} ===")
self.logger.debug(f"{score}")
self.logger.debug("=" * 50)
# Track performance for this question type
if self.current_question_type:
self.question_performance[self.current_question_type].append(score)
# Keep only last 10 scores per question type
if len(self.question_performance[self.current_question_type]) > 10:
self.question_performance[self.current_question_type].pop(0)
except Exception as e:
score = -1.0
ground_truth = item[1] if isinstance(item[1], str) else ""
# Display Score for error case
print(f"\n=== Score {i+1} ===")
print(f"{score} (Error: {e})")
print("=" * 50)
# Skip if too few tokens
if len([i for i in masks if i != -100]) < 10:
continue
scores["tokens"].append(tokens)
scores["masks"].append(masks)
scores["scores"].append(score)
scores["ground_truths"].append(ground_truth)
if len(scores["tokens"]) >= self.config.group_size:
break
if not scores["tokens"]:
return None
return scores
async def _score_transaction(self, agent_response: str) -> float:
"""Score a transaction based on multiple criteria"""
try:
# First, extract JSON from the response (handling thinking tags)
tx_obj = self._extract_transaction_json(agent_response)
if tx_obj is None:
return -1.0 # Could not extract valid JSON
# Validate required fields
if not all(field in tx_obj for field in ["to", "value", "data"]):
return -1.0 # Missing required fields
# Get expected transfer details from stored prompt data
if not hasattr(self, "current_prompt_data") or not self.current_prompt_data:
return -1.0
expected_token = self.current_prompt_data.get("transfer_token", "ETH")
expected_amount = self.current_prompt_data.get("transfer_amount", "0")
expected_destination = self.current_prompt_data.get(
"destination_address", ""
)
# Get sender and destination addresses
sender_address = self.anvil_config.funding.custom_wallet
destination_address = tx_obj.get("to", "")
# Get relevant tokens to check
relevant_tokens = ["ETH"]
if expected_token != "ETH":
relevant_tokens.append(expected_token)
# Take a snapshot before execution
snapshot_id = self.blockchain.snapshot()
# Get pre-execution balances for both addresses
pre_balances = {
"sender": self.blockchain.get_wallet_balances(
sender_address, relevant_tokens
),
"destination": self.blockchain.get_wallet_balances(
destination_address, relevant_tokens
),
}
try:
# Execute the transaction
result = self.blockchain.execute_transaction(tx_obj)
# Get post-execution balances
post_balances = {
"sender": self.blockchain.get_wallet_balances(
sender_address, relevant_tokens
),
"destination": self.blockchain.get_wallet_balances(
destination_address, relevant_tokens
),
}
# Calculate score based on execution result and balance changes
score = self._calculate_transaction_score(
tx_obj,
result,
agent_response,
pre_balances,
post_balances,
expected_token,
expected_amount,
expected_destination,
)
# Revert to snapshot to maintain clean state
self.blockchain.revert(snapshot_id)
return score
except Exception:
# Revert on any error
self.blockchain.revert(snapshot_id)
return -1.0 # Execution error
except Exception:
return -1.0 # General error
def _calculate_transaction_score(
self,
tx_obj: Dict[str, Any],
result: Dict[str, Any],
agent_response: str,
pre_balances: Dict[str, Dict[str, Any]],
post_balances: Dict[str, Dict[str, Any]],
transfer_token: str,
transfer_amount: str,
destination_address: str,
) -> float:
"""Calculate score based on transaction execution results and balance changes"""
base_score = 0.0
# 1. Successful execution (0.3 points)
if result.get("status") == "0x1":
base_score += 0.3 # Transaction succeeded
# 2. Correct transaction - exact balance verification (0.5 points)
balance_score = self._verify_expected_transfers(
pre_balances,
post_balances,
transfer_token,
transfer_amount,
destination_address,
)
base_score += balance_score
# 3. Thinking quality (max 0.1 points, with negative for missing thinking)
thinking_score = self._analyze_thinking_quality(agent_response)
base_score += thinking_score # Range: -0.2 to +0.1
# 4. To field verification (0.05 points)
tx_to = tx_obj.get("to", "").lower()
expected_to = destination_address.lower()
if tx_to == expected_to:
base_score += 0.05
# 5. Data field verification (0.05 points)
data_field = tx_obj.get("data", "0x")
if transfer_token == "ETH":
# ETH transfer should have empty data field
if data_field == "0x":
base_score += 0.05
else:
# ERC-20 transfer should have transfer function call data
if data_field.startswith("0xa9059cbb"): # transfer function selector
base_score += 0.05
return base_score
def _verify_expected_transfers(
self,
pre_balances: Dict[str, Dict[str, Any]],
post_balances: Dict[str, Dict[str, Any]],
expected_token: str,
expected_amount: str,
expected_destination: str,
) -> float:
"""Verify that the expected transfer amounts occurred"""
try:
expected_amount_float = float(expected_amount)
# For ETH transfers - only check destination address
if expected_token == "ETH":
# Extract balance values from the nested dictionary structure
dest_pre_balance = (
pre_balances["destination"].get("ETH", {}).get("balance", 0)
)
dest_post_balance = (
post_balances["destination"].get("ETH", {}).get("balance", 0)
)
dest_eth_change = float(dest_post_balance) - float(dest_pre_balance)
# Check if destination gained exactly the expected amount
if abs(dest_eth_change - expected_amount_float) < 1e-10:
return 0.5
# For ERC-20 transfers - check both sender and destination
else:
# Extract balance values from the nested dictionary structure
sender_pre_balance = (
pre_balances["sender"].get(expected_token, {}).get("balance", 0)
)
sender_post_balance = (
post_balances["sender"].get(expected_token, {}).get("balance", 0)
)
dest_pre_balance = (
pre_balances["destination"]
.get(expected_token, {})
.get("balance", 0)
)
dest_post_balance = (
post_balances["destination"]
.get(expected_token, {})
.get("balance", 0)
)
sender_token_change = float(sender_post_balance) - float(
sender_pre_balance
)
dest_token_change = float(dest_post_balance) - float(dest_pre_balance)
# For ERC-20, expect exact amounts (no gas costs in token)
if (
abs(sender_token_change + expected_amount_float) < 1e-6
and abs(dest_token_change - expected_amount_float) < 1e-6
):
return 0.5
return 0.0 # Transfer amounts don't match expectations
except (ValueError, TypeError):
return 0.0
def _analyze_thinking_quality(self, response: str) -> float:
"""Evaluate thinking tag quality with max 0.1 points, negative for missing thinking"""
thinking_score = 0.0
# Check for thinking tags
if "<think>" not in response or "</think>" not in response:
return -0.2 # Penalty for no thinking tags
# Extract thinking content
try:
thinking_match = re.search(r"<think>(.*?)</think>", response, re.DOTALL)
if not thinking_match:
return -0.2 # No thinking content found
thinking_content = thinking_match.group(1).strip()
if not thinking_content:
return -0.1 # Empty thinking tags
# Basic quality assessment for positive score (max 0.1)
word_count = len(thinking_content.split())
# Award points based on thinking depth
if word_count >= 50: # Substantial thinking
thinking_score += 0.1
elif word_count >= 20: # Moderate thinking
thinking_score += 0.05
elif word_count >= 5: # Minimal thinking
thinking_score += 0.02
return thinking_score
except Exception:
return -0.1 # Error in processing thinking
async def evaluate(self, *args, **kwargs):
"""Evaluation method - could implement portfolio performance tracking"""
return
def close(self):
"""Clean up resources"""
cleanup_blockchain(self.blockchain)
@classmethod
def config_init(cls) -> Tuple[EVMEnvConfig, List[APIServerConfig]]:
"""Initialize configuration for EVM environment"""
# pydantic-settings automatically loads from YAML file
env_config = EVMEnvConfig(
tokenizer_name="NousResearch/Hermes-3-Llama-3.1-8B",
group_size=4,
use_wandb=True,
rollout_server_url="http://localhost:8000",
total_steps=500,
batch_size=16,
steps_per_eval=50,
max_token_length=2048,
wandb_name="evm-agent",
anvil_config_path="configs/token_transfers.yaml",
)
# API server configuration
server_configs = [
APIServerConfig(
model_name="gpt-4o-mini",
base_url=None, # Use OpenAI directly
api_key=os.environ.get("OPENAI_API_KEY"),
num_requests_for_eval=64,
),
]
return env_config, server_configs
if __name__ == "__main__":
EVMEnv.cli()

View file

@ -0,0 +1,3 @@
# EVM Environment specific dependencies
# (openai, pydantic, requests, gymnasium, numpy are provided by atroposlib)
pyyaml>=6.0

View file

@ -0,0 +1,165 @@
#!/bin/bash
# Ethereum Virtual Machine Environment Setup Script
# This script installs Foundry/Anvil and sets up the Python environment
set -e # Exit on any error
echo "🔧 Setting up Ethereum Virtual Machine Environment..."
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Function to print colored output
print_status() {
echo -e "${GREEN}[INFO]${NC} $1"
}
print_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
print_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# Check if Foundry is already installed
check_foundry() {
if command -v anvil &> /dev/null && command -v cast &> /dev/null && command -v forge &> /dev/null; then
print_status "Foundry is already installed"
anvil --version
return 0
else
return 1
fi
}
# Install Foundry
install_foundry() {
print_status "Installing Foundry..."
# Download and install Foundry
curl -L https://foundry.paradigm.xyz | bash
# Source the profile to update PATH
if [ -f ~/.bashrc ]; then
source ~/.bashrc
elif [ -f ~/.zshrc ]; then
source ~/.zshrc
fi
# Run foundryup to install the latest version
if command -v foundryup &> /dev/null; then
foundryup
else
print_warning "foundryup not found in PATH. Please restart your terminal and run 'foundryup'"
print_warning "Then run this setup script again."
exit 1
fi
}
# Verify installation
verify_installation() {
print_status "Verifying installation..."
if ! command -v anvil &> /dev/null; then
print_error "Anvil not found. Installation may have failed."
print_error "Please restart your terminal and try again."
exit 1
fi
if ! command -v cast &> /dev/null; then
print_error "Cast not found. Installation may have failed."
exit 1
fi
print_status "✅ Foundry tools installed successfully:"
echo " - $(anvil --version)"
echo " - $(cast --version)"
echo " - $(forge --version)"
}
# Install Python dependencies
install_python_deps() {
print_status "Installing Python dependencies..."
if [ -f "requirements.txt" ]; then
pip install -r requirements.txt
print_status "Installed EVM-specific dependencies from requirements.txt"
else
print_warning "requirements.txt not found. Installing minimal dependencies..."
pip install pyyaml>=6.0
fi
print_status "Note: Main dependencies (openai, pydantic, requests) are provided by atroposlib"
}
# Check for OpenAI API key
check_openai_key() {
if [ -z "$OPENAI_API_KEY" ]; then
print_warning "OPENAI_API_KEY environment variable not set"
echo " Set it with: export OPENAI_API_KEY='your-api-key-here'"
echo " This is required for question generation"
else
print_status "✅ OPENAI_API_KEY is set"
fi
}
# Test the configuration
test_config() {
print_status "Testing configuration..."
python -c "
try:
from anvil import AnvilConfig
config = AnvilConfig()
print('✅ Configuration loaded successfully')
print(f' - Config file: {config.config_file}')
print(f' - Network port: {config.anvil.port}')
print(f' - Fork URL: {config.anvil.fork_url}')
print(f' - Custom wallet: {config.funding.custom_wallet}')
except Exception as e:
print(f'❌ Configuration test failed: {e}')
exit(1)
"
}
# Main setup process
main() {
echo "🚀 Starting setup process..."
echo
# Check if already installed
if check_foundry; then
print_status "Foundry already installed, skipping installation"
else
install_foundry
verify_installation
fi
echo
install_python_deps
echo
check_openai_key
echo
test_config
echo
print_status "🎉 Setup completed successfully!"
echo
echo "Next steps:"
echo " 1. Configure configs/token_transfers.yaml if needed"
echo " 2. Set OPENAI_API_KEY if not already set"
echo " 3. Run inference: python evm_server.py process --env.data_path_to_save_groups evm_rollouts.jsonl"
echo " 4. Or run training: python evm_server.py serve"
echo
echo "For troubleshooting, see README.md"
}
# Run main function
main "$@"

View file

@ -0,0 +1,84 @@
"""
Utility functions for the EVM Environment
This module contains cleanup handlers, signal management, and other utility functions.
"""
import atexit
import logging
import signal
import sys
class CleanupManager:
"""Manages cleanup operations for the EVM environment"""
def __init__(self):
self.cleanup_functions = []
self.logger = logging.getLogger(__name__)
self._setup_handlers()
def _setup_handlers(self):
"""Setup cleanup handlers for various exit scenarios"""
# Register cleanup function to run on normal exit
atexit.register(self._execute_cleanup)
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGINT, self._signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, self._signal_handler) # Termination signal
# On Windows, also handle SIGBREAK
if hasattr(signal, "SIGBREAK"):
signal.signal(signal.SIGBREAK, self._signal_handler)
def register_cleanup(self, cleanup_func, *args, **kwargs):
"""Register a cleanup function to be called on exit"""
self.cleanup_functions.append((cleanup_func, args, kwargs))
def _signal_handler(self, signum, frame):
"""Handle shutdown signals gracefully"""
print(f"\nReceived signal {signum}, shutting down gracefully...")
self._execute_cleanup()
sys.exit(0)
def _execute_cleanup(self):
"""Execute all registered cleanup functions"""
for cleanup_func, args, kwargs in self.cleanup_functions:
try:
cleanup_func(*args, **kwargs)
except Exception as e:
print(f"Error during cleanup: {e}")
def setup_evm_error_message(anvil_config, error: Exception) -> str:
"""Generate a comprehensive error message for EVM setup failures"""
error_message = f"\n❌ Error setting up EVM environment: {error}"
error_message += "\n\n🔧 Troubleshooting suggestions:"
error_message += "\n 1. Check if Anvil is already running on the configured port"
error_message += "\n 2. Ensure no previous Anvil processes are still running:"
error_message += "\n - Run: pkill -f anvil"
error_message += "\n - Or: ps aux | grep anvil"
error_message += "\n 3. Verify Foundry/Anvil is properly installed:"
error_message += "\n - Run: anvil --version"
error_message += "\n 4. Check if the port is available:"
error_message += f"\n - Run: netstat -tulpn | grep {anvil_config.anvil.port}"
error_message += (
"\n\n💡 Try restarting the environment after addressing these issues."
)
return error_message
def cleanup_blockchain(blockchain) -> None:
"""Clean up blockchain resources"""
try:
if blockchain:
print("Stopping Anvil blockchain...")
blockchain.stop()
print("Anvil stopped successfully.")
except Exception as e:
print(f"Error during blockchain cleanup: {e}")
# Global cleanup manager instance
cleanup_manager = CleanupManager()