diff --git a/README.md b/README.md index 62b31c77..c91d0949 100644 --- a/README.md +++ b/README.md @@ -297,6 +297,17 @@ Always refer to the specific environment script's help for all available options python environments/your_environment_script.py process --help ``` +### Environment Evaluation with `evaluate` + +For running evaluation on environments, Atropos provides an `evaluate` subcommand that calls the environment's `evaluate` method: + +```sh +python gsm8k_server.py evaluate \ + --openai.base_url https://openrouter.ai/api/v1 \ + --openai.api_key $OPENROUTER_API_KEY \ + --openai.model_name qwen/qwen3-14b +``` + ### Offline Data Generation Quick Start Run the below in separate terminals: diff --git a/atroposlib/envs/base.py b/atroposlib/envs/base.py index 99917546..6abf13d8 100644 --- a/atroposlib/envs/base.py +++ b/atroposlib/envs/base.py @@ -1142,17 +1142,25 @@ class BaseEnv(ABC): generate_html(self.config.data_path_to_save_groups) + async def _run_evaluate(self): + """ + Internal method to run evaluation with proper setup. + """ + await self.setup() + await self.evaluate() + @classmethod def cli(cls): """ Command-line interface entry point for the environment. - This method handles the CLI commands for serve and process. + This method handles the CLI commands for serve, process, and evaluate. """ # Create subcommands dictionary subcommands = { "serve": cls.get_cli_serve_config_cls(), "process": cls.get_cli_process_config_cls(), + "evaluate": cls.get_cli_evaluate_config_cls(), } # Custom exception handler for cleaner error output @@ -1603,3 +1611,251 @@ class BaseEnv(ABC): asyncio.run(env.process_manager()) return CliProcessConfig + + @classmethod + def get_cli_evaluate_config_cls(cls) -> type: + """ + Returns the CLI configuration class for evaluate commands. + + Returns: + type: The CliEvaluateConfig class for evaluate commands. + """ + # Get the default configurations from the specific environment class via config_init + default_env_config_from_init, default_server_configs_from_init = ( + cls.config_init() + ) + + # Define namespace prefixes + env_full_prefix = f"{ENV_NAMESPACE}{NAMESPACE_SEP}" + openai_full_prefix = f"{OPENAI_NAMESPACE}{NAMESPACE_SEP}" + + # Create Pydantic model classes based on the types from config_init. + # The defaults from config_init will be the primary source of defaults. + env_config_cls_from_init = type(default_env_config_from_init) + + # Handle server_configs_from_init appropriately for creating a default CLI model + # If it's a list (multiple servers), we'll take the first one as a template for CLI args, + # or use APIServerConfig if the list is empty or contains ServerBaseline. + # If it's a single APIServerConfig, we use its type. + # If it's ServerBaseline, we use APIServerConfig type for CLI args to allow overrides. + if isinstance(default_server_configs_from_init, list): + if default_server_configs_from_init and isinstance( + default_server_configs_from_init[0], APIServerConfig + ): + openai_config_cls_for_cli = type(default_server_configs_from_init[0]) + # Use the actual instance for default values later if it's a single config + default_openai_config_instance_for_cli = ( + default_server_configs_from_init[0] + if len(default_server_configs_from_init) == 1 + else openai_config_cls_for_cli() + ) + else: + openai_config_cls_for_cli = ( + APIServerConfig # Default to APIServerConfig for CLI definition + ) + default_openai_config_instance_for_cli = APIServerConfig() + elif isinstance(default_server_configs_from_init, APIServerConfig): + openai_config_cls_for_cli = type(default_server_configs_from_init) + default_openai_config_instance_for_cli = default_server_configs_from_init + else: # ServerBaseline or other + openai_config_cls_for_cli = APIServerConfig + default_openai_config_instance_for_cli = APIServerConfig() + + class CliEvaluateConfig( + get_prefixed_pydantic_model(env_config_cls_from_init, env_full_prefix), + get_prefixed_pydantic_model(openai_config_cls_for_cli, openai_full_prefix), + ServerManagerConfig, # ServerManagerConfig defaults are fine as is. + Cmd, + ): + """ + Configuration for the evaluate command. + Supports overrides via YAML config file and CLI arguments. + Order of precedence: CLI > YAML > `config_init` defaults. + """ + + config: str | None = Field( + default=None, + description="Path to .yaml config file. CLI args override this.", + ) + + def run(self) -> None: + """The logic to execute for the 'evaluate' command.""" + # Set default wandb name if not provided and class has a name + wandb_name_attr = f"{ENV_NAMESPACE}{NAMESPACE_SEP}wandb_name" + if ( + getattr(self, wandb_name_attr, None) is None + and cls.name is not None + ): + setattr(self, wandb_name_attr, cls.name) + + # Load configuration from YAML file if specified + if self.config is not None: + with open(self.config, "r") as f: + yaml_config = yaml.safe_load(f) + print(f"Loaded config from {self.config}") + else: + yaml_config = {} + + # Get CLI flags passed with double dashes + cli_passed_flags = get_double_dash_flags() + + # --- Configuration Merging --- + # Priority: CLI > YAML > `config_init` defaults + + # 1. Environment Configuration + # Start with defaults from config_init + env_config_dict_base = default_env_config_from_init.model_dump() + # Apply specific overrides for evaluate mode that are generally useful + env_config_dict_base["use_wandb"] = True + + env_config_dict = merge_dicts( + env_config_dict_base, # `config_init` defaults with evaluate adjustments + yaml_config.get(ENV_NAMESPACE, {}), # YAML config + extract_namespace(cli_passed_flags, env_full_prefix), # CLI args + ) + + # 2. OpenAI Configuration + oai_cli_passed_args = extract_namespace( + cli_passed_flags, openai_full_prefix + ) # CLI args + yaml_oai_config = yaml_config.get(OPENAI_NAMESPACE, {}) + + # Determine the base OpenAI config from config_init for merging + # This uses the instance we determined earlier for CLI definition defaults + openai_config_dict_base = ( + default_openai_config_instance_for_cli.model_dump() + ) + + if isinstance(default_server_configs_from_init, ServerBaseline) and ( + oai_cli_passed_args or yaml_oai_config + ): + # If config_init provided ServerBaseline, but CLI/YAML provides OpenAI specifics, + # it implies an override intent for a single server. + # We use the default_openai_config_instance_for_cli (which would be a default APIServerConfig) + # as the base for merging, allowing it to be fully specified by YAML/CLI. + pass # Base is already set correctly for this case + + if isinstance(yaml_oai_config, list) and len(yaml_oai_config) == 1: + # If YAML specifies a single server config for OpenAI namespace + yaml_oai_single_server_config = yaml_oai_config[0] + elif isinstance(yaml_oai_config, dict): + yaml_oai_single_server_config = yaml_oai_config + else: + yaml_oai_single_server_config = {} + + openai_config_dict = merge_dicts( + openai_config_dict_base, # Default from config_init (or default APIServerConfig) + yaml_oai_single_server_config, # YAML config for a single server + oai_cli_passed_args, # CLI args + ) + + # 3. Server Manager Configuration + server_manager_cli_passed_flags = {} + if "slurm" in cli_passed_flags: + server_manager_cli_passed_flags["slurm"] = cli_passed_flags["slurm"] + if "testing" in cli_passed_flags: + server_manager_cli_passed_flags["testing"] = cli_passed_flags[ + "testing" + ] + + server_manager_yaml_dict = {} + if "slurm" in yaml_config: + server_manager_yaml_dict["slurm"] = yaml_config["slurm"] + if "testing" in yaml_config: + server_manager_yaml_dict["testing"] = yaml_config["testing"] + + # Start with ServerManagerConfig defaults, then apply YAML, then CLI + # For evaluate mode, slurm and testing are typically False unless specified. + server_manager_config_dict_base = ServerManagerConfig( + slurm=False, testing=False + ).model_dump() + + server_manager_config_dict = merge_dicts( + server_manager_config_dict_base, + server_manager_yaml_dict, + server_manager_cli_passed_flags, + ) + + # --- Instantiate Final Config Objects --- + # Use the original class types from config_init (or APIServerConfig for OpenAI CLI) + + env_config = env_config_cls_from_init(**env_config_dict) + server_manager_config = ServerManagerConfig( + **server_manager_config_dict + ) + + # Determine the final server_configs. + # For 'evaluate', we typically expect a single server configuration for the OAI part. + # The resolve_openai_configs will handle complex cases, but for 'evaluate', + # the openai_config_dict we built should represent the single intended server. + + # If default_server_configs_from_init was ServerBaseline, and we have openai_config_dict, + # it means we are overriding to use a specific APIServerConfig. + # If default_server_configs_from_init was a list or single APIServerConfig, + # resolve_openai_configs will merge appropriately. + + final_openai_configs = resolve_openai_configs( + default_server_configs=default_server_configs_from_init, # Pass the original structure + openai_config_dict=openai_config_dict, # This is the merged single server config for CLI/YAML + yaml_config=yaml_config, # Pass full YAML for resolve_openai_configs logic + cli_passed_flags=cli_passed_flags, # Pass full CLI for resolve_openai_configs + logger=logger, + ) + + # Add warning for localhost or 0.0.0.0 + if isinstance(final_openai_configs, list): + for cfg in final_openai_configs: + if ( + isinstance(cfg, APIServerConfig) + and cfg.base_url + and ( + "localhost" in cfg.base_url + or "0.0.0.0" in cfg.base_url + or "127.0.0.1" in cfg.base_url + ) + ): + warnings.warn( + "You are using a local Base URL for an OpenAI compatible server in 'evaluate' mode. " + "Ensure you have a server running at this address or results may not be generated.", + UserWarning, + ) + break # Warn once + elif ( + isinstance(final_openai_configs, APIServerConfig) + and final_openai_configs.base_url + and ( + "localhost" in final_openai_configs.base_url + or "0.0.0.0" in final_openai_configs.base_url + or "127.0.0.1" in final_openai_configs.base_url + ) + ): + warnings.warn( + "You are using a local Base URL for an OpenAI compatible server in 'evaluate' mode. " + "Ensure you have a server running at this address or results may not be generated.", + UserWarning, + ) + + rprint(env_config) + rprint(final_openai_configs) + + # --- Create and Run Environment --- + # Create the environment instance + env = cls( + config=env_config, + server_configs=final_openai_configs, + slurm=server_manager_config.slurm, + testing=server_manager_config.testing, + ) + + print("Running evaluation...") + # Handle the case where we might already be in an event loop + try: + loop = asyncio.get_running_loop() + task = loop.create_task(env._run_evaluate()) + loop.run_until_complete(task) + except RuntimeError: + asyncio.run(env._run_evaluate()) + + print("Evaluation completed.") + + return CliEvaluateConfig diff --git a/environments/community/pay_to_play/README.md b/environments/community/pay_to_play/README.md index ef66311d..71434b64 100644 --- a/environments/community/pay_to_play/README.md +++ b/environments/community/pay_to_play/README.md @@ -144,15 +144,15 @@ env = PayToPlayEnv(config, server_configs, testing=True) ```python async def training_loop(): await env.setup() - + for step in range(config.total_steps): # Get next question question = await env.get_next_item() - + # Agent selects agent cards and makes payments # Evaluates response and gets training signal scored_data, _ = await env.collect_trajectories(question) - + # Log metrics await env.wandb_log() ``` @@ -162,7 +162,7 @@ async def training_loop(): ### Pricing Strategy - **Technical Expert ($0.03)**: Premium pricing reflects high accuracy and specialized knowledge -- **Communication Specialist ($0.02)**: Mid-tier pricing for clarity and accessibility focus +- **Communication Specialist ($0.02)**: Mid-tier pricing for clarity and accessibility focus - **Creative Thinker ($0.01)**: Budget option encouraging creativity and innovation ### Budget Scenarios @@ -190,7 +190,7 @@ async def _agent_select_judges(self, question: str) -> JudgeSelection: # 1. Agent analyzes question directly (no pre-categorization) # 2. Get agent card performance stats judge_stats = self._get_judge_performance_stats() - + # 3. AI agent makes strategic decision with full context selection_response = await self.server.chat_completion( messages=[ @@ -198,7 +198,7 @@ async def _agent_select_judges(self, question: str) -> JudgeSelection: {"role": "user", "content": selection_prompt} ] ) - + # 4. Validate and execute selection return validated_selection ``` @@ -285,19 +285,19 @@ Set `testing_mode=False` for real USDC payments on Base blockchain: ## ๐ŸŽ–๏ธ Key Features -โœ… **Multiple Specialized Agent Cards**: Different expertise areas and pricing tiers -โœ… **Intelligent Agent Selection**: AI-driven agent card selection with dynamic question analysis -โœ… **Budget Awareness**: Real economic constraints drive efficient learning -โœ… **Performance Tracking**: Historical data informs future decisions -โœ… **Blockchain Integration**: Real USDC payments on Base network -โœ… **Comprehensive Monitoring**: Detailed metrics and decision analysis -โœ… **Fallback Mechanisms**: Robust handling of budget constraints -โœ… **Testing Framework**: Simulation mode for development and testing +โœ… **Multiple Specialized Agent Cards**: Different expertise areas and pricing tiers +โœ… **Intelligent Agent Selection**: AI-driven agent card selection with dynamic question analysis +โœ… **Budget Awareness**: Real economic constraints drive efficient learning +โœ… **Performance Tracking**: Historical data informs future decisions +โœ… **Blockchain Integration**: Real USDC payments on Base network +โœ… **Comprehensive Monitoring**: Detailed metrics and decision analysis +โœ… **Fallback Mechanisms**: Robust handling of budget constraints +โœ… **Testing Framework**: Simulation mode for development and testing ## ๐Ÿšง Future Enhancements - **Dynamic Pricing**: Agent card prices adjust based on demand and performance -- **Agent Card Reputation System**: Community-driven agent card quality ratings +- **Agent Card Reputation System**: Community-driven agent card quality ratings - **Multi-Round Evaluation**: Iterative feedback and improvement cycles - **Agent Card Specialization**: More granular specialty categories - **Economic Incentives**: Reward mechanisms for high-performing agent cards @@ -327,4 +327,3 @@ This environment builds upon recent advances in reinforcement learning from AI f - **RLAIF vs. RLHF**: Lee, H., et al. (2023). "RLAIF vs. RLHF: Scaling Reinforcement Learning from Human Feedback with AI Feedback." *arXiv preprint arXiv:2309.00267*. [https://arxiv.org/abs/2309.00267](https://arxiv.org/abs/2309.00267) - **Mixture of Judges**: Xu, T., et al. (2024). "The Perfect Blend: Redefining RLHF with Mixture of Judges." *arXiv preprint arXiv:2409.20370*. [https://arxiv.org/abs/2409.20370](https://arxiv.org/abs/2409.20370) - diff --git a/environments/community/pay_to_play/agent_cards_config.py b/environments/community/pay_to_play/agent_cards_config.py index b753da2e..20844ea2 100644 --- a/environments/community/pay_to_play/agent_cards_config.py +++ b/environments/community/pay_to_play/agent_cards_config.py @@ -9,12 +9,13 @@ Author: OpenBlock Labs License: MIT """ -import yaml -from decimal import Decimal -from typing import Dict, List, Tuple from dataclasses import dataclass +from decimal import Decimal from enum import Enum from pathlib import Path +from typing import Dict, List, Tuple + +import yaml def _load_config(): @@ -22,15 +23,11 @@ def _load_config(): # Deterministic path: from pay_to_play directory to modal/configs config_file = Path(__file__).parent.parent / "configs" / "pay_to_play_modal.yaml" if config_file.exists(): - with open(config_file, 'r') as f: + with open(config_file, "r") as f: return yaml.safe_load(f) - + # Default config if file doesn't exist - return { - "model": { - "name": "microsoft/DialoGPT-small" - } - } + return {"model": {"name": "microsoft/DialoGPT-small"}} # Load config once at module level @@ -40,10 +37,11 @@ _CONFIG = _load_config() class AgentCardSpecialty(Enum): """ Agent card specialties for different types of evaluation. - + Each specialty represents a domain of expertise that an agent card can provide. Agent cards can have multiple specialties to handle diverse evaluation needs. """ + TECHNICAL_ACCURACY = "technical_accuracy" CLARITY_COMMUNICATION = "clarity_communication" CREATIVE_THINKING = "creative_thinking" @@ -55,10 +53,10 @@ class AgentCardSpecialty(Enum): class AgentCardConfig: """ Configuration for an agent card (without wallet credentials). - + This class contains all the metadata needed to define an agent card's capabilities, pricing, and evaluation approach. Wallet credentials are kept separate for security. - + Attributes: name: Human-readable name for the agent card price_usd: Cost in USD to use this agent card for one evaluation @@ -67,6 +65,7 @@ class AgentCardConfig: system_prompt: The prompt used to guide this agent card's evaluations model_name: The specific model this agent uses for evaluation """ + name: str price_usd: Decimal specialties: List[AgentCardSpecialty] @@ -100,7 +99,10 @@ AGENT_CARDS_CONFIG: Dict[str, AgentCardConfig] = { "technical_expert": AgentCardConfig( name="Technical Expert", price_usd=Decimal("0.03"), # Premium pricing for specialized expertise - specialties=[AgentCardSpecialty.TECHNICAL_ACCURACY, AgentCardSpecialty.REASONING_LOGIC], + specialties=[ + AgentCardSpecialty.TECHNICAL_ACCURACY, + AgentCardSpecialty.REASONING_LOGIC, + ], description=( "Specialized in technical accuracy, complex reasoning, and factual correctness. " "Excellent for STEM questions, programming challenges, and analytical tasks." @@ -121,9 +123,8 @@ AGENT_CARDS_CONFIG: Dict[str, AgentCardConfig] = { "tags, then provide your evaluation.\n\n" "End with \\boxed{score} where score is between 0.0 and 1.0." ), - model_name=_teacher_model # Use model from config + model_name=_teacher_model, # Use model from config ), - "communication_specialist": AgentCardConfig( name="Communication Specialist", price_usd=Decimal("0.02"), # Mid-tier pricing for communication focus @@ -146,9 +147,8 @@ AGENT_CARDS_CONFIG: Dict[str, AgentCardConfig] = { "then provide your evaluation.\n\n" "End with \\boxed{score} where score is between 0.0 and 1.0." ), - model_name=_teacher_model # Use model from config + model_name=_teacher_model, # Use model from config ), - "creative_thinker": AgentCardConfig( name="Creative Thinker", price_usd=Decimal("0.01"), # Budget pricing to encourage creative exploration @@ -172,7 +172,7 @@ AGENT_CARDS_CONFIG: Dict[str, AgentCardConfig] = { " tags, then provide your evaluation.\n\n" "End with \\boxed{score} where score is between 0.0 and 1.0." ), - model_name=_teacher_model # Use model from config + model_name=_teacher_model, # Use model from config ), } @@ -180,13 +180,13 @@ AGENT_CARDS_CONFIG: Dict[str, AgentCardConfig] = { def get_agent_card_config(agent_card_id: str) -> AgentCardConfig: """ Get configuration for a specific agent card. - + Args: agent_card_id: The unique identifier for the agent card - + Returns: AgentCardConfig object containing the agent card's configuration - + Raises: ValueError: If the agent_card_id is not found """ @@ -202,26 +202,28 @@ def get_agent_card_config(agent_card_id: str) -> AgentCardConfig: def get_all_agent_card_configs() -> Dict[str, AgentCardConfig]: """ Get all agent card configurations. - + Returns: Dictionary mapping agent card IDs to their configurations """ return AGENT_CARDS_CONFIG.copy() -def get_agent_cards_by_specialty(specialty: AgentCardSpecialty) -> Dict[str, AgentCardConfig]: +def get_agent_cards_by_specialty( + specialty: AgentCardSpecialty, +) -> Dict[str, AgentCardConfig]: """ Get all agent cards that have a specific specialty. - + Args: specialty: The specialty to filter by - + Returns: Dictionary of agent card IDs and configs that have the specified specialty """ return { - agent_card_id: config - for agent_card_id, config in AGENT_CARDS_CONFIG.items() + agent_card_id: config + for agent_card_id, config in AGENT_CARDS_CONFIG.items() if specialty in config.specialties } @@ -229,7 +231,7 @@ def get_agent_cards_by_specialty(specialty: AgentCardSpecialty) -> Dict[str, Age def get_cheapest_agent_card() -> Tuple[str, AgentCardConfig]: """ Get the cheapest available agent card. - + Returns: Tuple of (agent_card_id, agent_card_config) for the lowest priced agent card """ @@ -239,7 +241,7 @@ def get_cheapest_agent_card() -> Tuple[str, AgentCardConfig]: def get_most_expensive_agent_card() -> Tuple[str, AgentCardConfig]: """ Get the most expensive available agent card. - + Returns: Tuple of (agent_card_id, agent_card_config) for the highest priced agent card """ @@ -249,7 +251,7 @@ def get_most_expensive_agent_card() -> Tuple[str, AgentCardConfig]: def get_price_range() -> Tuple[Decimal, Decimal]: """ Get the price range of all agent cards. - + Returns: Tuple of (min_price, max_price) across all agent cards """ @@ -260,16 +262,16 @@ def get_price_range() -> Tuple[Decimal, Decimal]: def validate_agent_card_configs() -> None: """ Validate that all agent card configurations are properly formatted. - + This function is called automatically on module import to ensure all agent card configurations are valid. - + Raises: ValueError: If any agent card configuration is invalid """ if not AGENT_CARDS_CONFIG: raise ValueError("No agent card configurations defined") - + # Validate each agent card configuration for agent_card_id, config in AGENT_CARDS_CONFIG.items(): try: @@ -277,8 +279,10 @@ def validate_agent_card_configs() -> None: # We just need to access it to trigger validation _ = config.name except Exception as e: - raise ValueError(f"Invalid configuration for agent card '{agent_card_id}': {e}") - + raise ValueError( + f"Invalid configuration for agent card '{agent_card_id}': {e}" + ) + # Ensure we have agent cards across different price points min_price, max_price = get_price_range() if min_price == max_price: @@ -286,4 +290,4 @@ def validate_agent_card_configs() -> None: # Validate configurations on import -validate_agent_card_configs() \ No newline at end of file +validate_agent_card_configs() diff --git a/environments/community/pay_to_play/pay_to_play_env.py b/environments/community/pay_to_play/pay_to_play_env.py index 906ce749..7a2def58 100644 --- a/environments/community/pay_to_play/pay_to_play_env.py +++ b/environments/community/pay_to_play/pay_to_play_env.py @@ -1,11 +1,11 @@ """ Pay-to-Play Environment with Mixture of Judges -A reinforcement learning environment where an AI agent must strategically select and pay -judges before each evaluation, implementing economic constraints and strategic decision-making +A reinforcement learning environment where an AI agent must strategically select and pay +judges before each evaluation, implementing economic constraints and strategic decision-making in AI training. -This environment creates genuine economic pressure by requiring real USDC payments on the +This environment creates genuine economic pressure by requiring real USDC payments on the Base blockchain before each evaluation, encouraging efficient learning and high-quality responses. Author: OpenBlock Labs @@ -14,14 +14,20 @@ License: MIT import json import logging +import random import time +from dataclasses import dataclass from decimal import Decimal from pathlib import Path from typing import Dict, List, Optional, Tuple -from dataclasses import dataclass -import random import wandb + +# Import agent card configurations +from agent_cards_config import ( + AgentCardSpecialty, + get_all_agent_card_configs, +) from eth_account import Account from pydantic import Field from web3 import Web3 @@ -34,12 +40,6 @@ from atroposlib.envs.base import ( ) from atroposlib.utils.tokenize_for_trainer import tokenize_for_trainer -# Import agent card configurations -from agent_cards_config import ( - AgentCardSpecialty, - get_all_agent_card_configs, -) - # Blockchain configuration BASE_RPC_URL = "https://mainnet.base.org" BASE_CHAIN_ID = 8453 @@ -52,31 +52,31 @@ USDC_ABI = [ "constant": False, "inputs": [ {"name": "_to", "type": "address"}, - {"name": "_value", "type": "uint256"} + {"name": "_value", "type": "uint256"}, ], "name": "transfer", "outputs": [{"name": "", "type": "bool"}], - "type": "function" + "type": "function", }, { "constant": True, "inputs": [{"name": "_owner", "type": "address"}], "name": "balanceOf", "outputs": [{"name": "balance", "type": "uint256"}], - "type": "function" - } + "type": "function", + }, ] - @dataclass class AgentCardMetadata: """ Metadata for each agent card including pricing, specialties, and wallet info. - + This combines the agent card configuration from agent_cards_config.py with wallet credentials from secrets.json to create a complete agent card instance. """ + name: str price_usd: Decimal specialties: List[AgentCardSpecialty] @@ -85,7 +85,7 @@ class AgentCardMetadata: model_name: str address: str private_key: str - + # Performance tracking total_evaluations: int = 0 average_score_given: float = 0.0 @@ -97,10 +97,11 @@ class AgentCardMetadata: class AgentCardSelection: """ Agent's decision about which agent cards to use for evaluation. - + Contains the agent's strategic choice of agent cards along with reasoning and cost analysis for transparency and debugging. """ + selected_agent_cards: List[str] reasoning: str expected_cost: Decimal @@ -111,27 +112,28 @@ class AgentCardSelection: class BudgetTracker: """ Tracks agent spending and budget decisions. - + Provides comprehensive budget management including affordability checks, spending tracking per agent card, and cost analysis over time. """ + initial_budget: Decimal current_balance: Decimal total_spent: Decimal spending_per_agent_card: Dict[str, Decimal] evaluations_count: int average_cost_per_eval: Decimal - + def can_afford(self, cost: Decimal) -> bool: """Check if the agent can afford a given cost.""" return self.current_balance >= cost - + def spend(self, amount: Decimal, agent_card_name: str) -> None: """Record a spending transaction and update budget tracking.""" self.current_balance -= amount self.total_spent += amount self.spending_per_agent_card[agent_card_name] = ( - self.spending_per_agent_card.get(agent_card_name, Decimal('0')) + amount + self.spending_per_agent_card.get(agent_card_name, Decimal("0")) + amount ) self.evaluations_count += 1 if self.evaluations_count > 0: @@ -140,25 +142,24 @@ class BudgetTracker: class PayToPlayConfig(BaseEnvConfig): """Configuration for the Pay-to-Play Environment.""" - + testing_mode: bool = Field( - default=False, - description="If True, simulates payments without real blockchain transactions" + default=False, + description="If True, simulates payments without real blockchain transactions", ) initial_budget_usd: float = Field( - default=1.0, - description="Initial budget for the agent in USD" + default=1.0, description="Initial budget for the agent in USD" ) class PayToPlayEnv(BaseEnv): """ Environment that requires crypto payments to multiple agent cards before LLM evaluation. - + The agent must select and pay agent cards before each evaluation, making strategic decisions about cost, quality, and agent card specialties based on budget constraints and past performance. - + Key Features: - Real USDC payments on Base blockchain (or simulated for testing) - Strategic agent card selection based on question analysis @@ -179,7 +180,7 @@ class PayToPlayEnv(BaseEnv): ): """ Initialize the Pay-to-Play environment. - + Args: config: Environment configuration server_configs: API server configurations for LLM inference @@ -192,36 +193,37 @@ class PayToPlayEnv(BaseEnv): self.eval_metrics = [] self.payment_logs = [] self.agent_card_selection_history = [] - + # Initialize Web3 connection self.w3 = Web3(Web3.HTTPProvider(BASE_RPC_URL)) self.usdc_contract = self.w3.eth.contract( - address=USDC_CONTRACT_ADDRESS, - abi=USDC_ABI + address=USDC_CONTRACT_ADDRESS, abi=USDC_ABI ) - + # Load wallet configuration and initialize agent cards wallet_config = self._load_wallet_config() self.agent_cards = self._initialize_agent_cards(wallet_config) - + # Agent wallet setup self.agent_account = Account.from_key(wallet_config["agent"]["private_key"]) - + # Initialize budget tracking initial_budget = Decimal(str(self.config.initial_budget_usd)) self.budget_tracker = BudgetTracker( initial_budget=initial_budget, current_balance=initial_budget, - total_spent=Decimal('0'), + total_spent=Decimal("0"), spending_per_agent_card={}, evaluations_count=0, - average_cost_per_eval=Decimal('0') + average_cost_per_eval=Decimal("0"), ) - + # Testing mode override self.testing_mode = testing or self.config.testing_mode - - logging.info(f"PayToPlay Environment initialized with {len(self.agent_cards)} agent cards") + + logging.info( + f"PayToPlay Environment initialized with {len(self.agent_cards)} agent cards" + ) logging.info(f"Agent wallet: {self.agent_account.address}") logging.info(f"Initial budget: ${initial_budget}") logging.info(f"Testing mode: {self.testing_mode}") @@ -229,10 +231,10 @@ class PayToPlayEnv(BaseEnv): def _load_wallet_config(self) -> Dict: """ Load wallet configuration from JSON file. - + Returns: Dictionary containing wallet configuration - + Raises: FileNotFoundError: If secrets.json is not found ValueError: If required wallet configuration is missing @@ -243,46 +245,48 @@ class PayToPlayEnv(BaseEnv): f"Secrets configuration not found: {wallet_file}\n" f"Please copy secrets.json.template to secrets.json and configure your wallets." ) - + try: - with open(wallet_file, 'r') as f: + with open(wallet_file, "r") as f: config = json.load(f) except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON in secrets.json: {e}") - + # Validate required fields if "agent" not in config or "private_key" not in config["agent"]: raise ValueError("Missing agent private_key in wallet configuration") - + return config - def _initialize_agent_cards(self, wallet_config: Dict) -> Dict[str, AgentCardMetadata]: + def _initialize_agent_cards( + self, wallet_config: Dict + ) -> Dict[str, AgentCardMetadata]: """ Initialize the panel of agent cards with different specialties and prices. - + Args: wallet_config: Wallet configuration containing agent card credentials - + Returns: Dictionary mapping agent card IDs to AgentCardMetadata instances - + Raises: ValueError: If agent card configuration or wallet credentials are missing """ agent_cards = {} - + # Get agent card wallet info from config agent_cards_wallet_config = wallet_config.get("agent_cards", {}) - + if not agent_cards_wallet_config: raise ValueError( "No agent cards configuration found in secrets.json. " "Please add agent card wallet addresses using the template." ) - + # Load agent card configurations from separate config file all_agent_card_configs = get_all_agent_card_configs() - + for agent_card_id, agent_card_config in all_agent_card_configs.items(): # Get wallet credentials for this agent card wallet_info = agent_cards_wallet_config.get(agent_card_id, {}) @@ -291,7 +295,7 @@ class PayToPlayEnv(BaseEnv): f"Missing wallet configuration for agent card '{agent_card_id}' in secrets.json. " f"Please add address and private_key for this agent card." ) - + # Combine agent card config with wallet credentials agent_cards[agent_card_id] = AgentCardMetadata( name=agent_card_config.name, @@ -301,9 +305,9 @@ class PayToPlayEnv(BaseEnv): system_prompt=agent_card_config.system_prompt, model_name=agent_card_config.model_name, address=wallet_info["address"], - private_key=wallet_info["private_key"] + private_key=wallet_info["private_key"], ) - + return agent_cards def _get_agent_card_performance_stats(self) -> Dict[str, Dict]: @@ -311,10 +315,10 @@ class PayToPlayEnv(BaseEnv): stats = {} for agent_card_name, agent_card in self.agent_cards.items(): stats[agent_card_name] = { - 'avg_score': agent_card.average_score_given, - 'consistency': agent_card.consistency_score, - 'satisfaction': agent_card.agent_satisfaction, - 'total_evals': agent_card.total_evaluations + "avg_score": agent_card.average_score_given, + "consistency": agent_card.consistency_score, + "satisfaction": agent_card.agent_satisfaction, + "total_evals": agent_card.total_evaluations, } return stats @@ -354,222 +358,301 @@ class PayToPlayEnv(BaseEnv): async def _check_wallet_balances(self): """Check and log wallet balances for agent and all agent cards.""" try: - agent_balance = self.usdc_contract.functions.balanceOf(self.agent_account.address).call() - agent_balance_usd = agent_balance / (10 ** USDC_DECIMALS) - + agent_balance = self.usdc_contract.functions.balanceOf( + self.agent_account.address + ).call() + agent_balance_usd = agent_balance / (10**USDC_DECIMALS) + logging.info(f"Agent USDC balance: ${agent_balance_usd:.6f}") - logging.info(f"Agent budget tracker balance: ${self.budget_tracker.current_balance:.6f}") - + logging.info( + f"Agent budget tracker balance: ${self.budget_tracker.current_balance:.6f}" + ) + for agent_card_name, agent_card in self.agent_cards.items(): try: - agent_card_balance = self.usdc_contract.functions.balanceOf(agent_card.address).call() - agent_card_balance_usd = agent_card_balance / (10 ** USDC_DECIMALS) - logging.info(f"Agent card {agent_card_name} USDC balance: ${agent_card_balance_usd:.6f}") + agent_card_balance = self.usdc_contract.functions.balanceOf( + agent_card.address + ).call() + agent_card_balance_usd = agent_card_balance / (10**USDC_DECIMALS) + logging.info( + f"Agent card {agent_card_name} USDC balance: ${agent_card_balance_usd:.6f}" + ) except Exception as e: - logging.warning(f"Could not check balance for agent card {agent_card_name}: {e}") - + logging.warning( + f"Could not check balance for agent card {agent_card_name}: {e}" + ) + if not self.testing_mode and self.budget_tracker.current_balance <= 0: logging.warning("Agent has no budget remaining!") - + except Exception as e: logging.error(f"Error checking balances: {e}") - async def _make_payments_to_agent_cards(self, selected_agent_cards: List[str]) -> Tuple[bool, Dict[str, Optional[str]]]: + async def _make_payments_to_agent_cards( + self, selected_agent_cards: List[str] + ) -> Tuple[bool, Dict[str, Optional[str]]]: """ Make USDC payments to selected agent cards. - + Returns: Tuple of (all_payments_successful, transaction_hashes_by_agent_card) """ - logging.info(f"๐Ÿ’ฐ Starting payment process to {len(selected_agent_cards)} agent cards: {selected_agent_cards}") - + logging.info( + f"๐Ÿ’ฐ Starting payment process to {len(selected_agent_cards)} agent cards: {selected_agent_cards}" + ) + # Log balances before payment try: - agent_balance_before = self.usdc_contract.functions.balanceOf(self.agent_account.address).call() - agent_balance_usd_before = agent_balance_before / (10 ** USDC_DECIMALS) - logging.info(f"๐Ÿ’ณ Agent USDC balance before payments: ${agent_balance_usd_before:.6f}") + agent_balance_before = self.usdc_contract.functions.balanceOf( + self.agent_account.address + ).call() + agent_balance_usd_before = agent_balance_before / (10**USDC_DECIMALS) + logging.info( + f"๐Ÿ’ณ Agent USDC balance before payments: ${agent_balance_usd_before:.6f}" + ) except Exception as e: logging.warning(f"Could not check agent balance: {e}") agent_balance_usd_before = 0 - + if self.testing_mode: - total_cost = sum(self.agent_cards[agent_card_name].price_usd for agent_card_name in selected_agent_cards) - logging.info(f"๐Ÿงช SIMULATED payments totaling ${total_cost} to agent cards: {selected_agent_cards}") - return True, {agent_card_name: None for agent_card_name in selected_agent_cards} - + total_cost = sum( + self.agent_cards[agent_card_name].price_usd + for agent_card_name in selected_agent_cards + ) + logging.info( + f"๐Ÿงช SIMULATED payments totaling ${total_cost} to agent cards: {selected_agent_cards}" + ) + return True, { + agent_card_name: None for agent_card_name in selected_agent_cards + } + tx_hashes = {} successful_payments = 0 - total_paid = Decimal('0') - + total_paid = Decimal("0") + for agent_card_name in selected_agent_cards: agent_card = self.agent_cards[agent_card_name] - payment_amount_usdc = int(agent_card.price_usd * (10 ** USDC_DECIMALS)) - + payment_amount_usdc = int(agent_card.price_usd * (10**USDC_DECIMALS)) + try: - logging.info(f"๐Ÿ’ธ Making REAL payment of ${agent_card.price_usd} to {agent_card_name} ({agent_card.address})") - + logging.info( + f"๐Ÿ’ธ Making REAL payment of ${agent_card.price_usd} to {agent_card_name} ({agent_card.address})" + ) + # Check balance - balance = self.usdc_contract.functions.balanceOf(self.agent_account.address).call() + balance = self.usdc_contract.functions.balanceOf( + self.agent_account.address + ).call() if balance < payment_amount_usdc: - balance_usd = balance / (10 ** USDC_DECIMALS) - logging.error(f"โŒ Insufficient USDC balance for {agent_card_name}: ${balance_usd:.6f} < ${agent_card.price_usd}") + balance_usd = balance / (10**USDC_DECIMALS) + logging.error( + f"โŒ Insufficient USDC balance for {agent_card_name}: ${balance_usd:.6f} < ${agent_card.price_usd}" + ) tx_hashes[agent_card_name] = None continue - + # Build and send transaction transfer_function = self.usdc_contract.functions.transfer( - agent_card.address, - payment_amount_usdc + agent_card.address, payment_amount_usdc ) - + gas_price = self.w3.eth.gas_price nonce = self.w3.eth.get_transaction_count(self.agent_account.address) - gas_estimate = transfer_function.estimate_gas({'from': self.agent_account.address}) - - transaction = transfer_function.build_transaction({ - 'from': self.agent_account.address, - 'gas': gas_estimate, - 'gasPrice': gas_price, - 'nonce': nonce, - }) - - signed_txn = self.w3.eth.account.sign_transaction(transaction, self.agent_account.key) + gas_estimate = transfer_function.estimate_gas( + {"from": self.agent_account.address} + ) + + transaction = transfer_function.build_transaction( + { + "from": self.agent_account.address, + "gas": gas_estimate, + "gasPrice": gas_price, + "nonce": nonce, + } + ) + + signed_txn = self.w3.eth.account.sign_transaction( + transaction, self.agent_account.key + ) tx_hash = self.w3.eth.send_raw_transaction(signed_txn.raw_transaction) - + logging.info(f"๐Ÿ“ก Transaction sent, waiting for confirmation...") - + # Wait for confirmation receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash, timeout=120) - + if receipt.status == 1: tx_hash_hex = tx_hash.hex() - logging.info(f"โœ… Payment to {agent_card_name} successful: ${agent_card.price_usd}") - logging.info(f"๐Ÿ”— Transaction: https://basescan.org/tx/{tx_hash_hex}") + logging.info( + f"โœ… Payment to {agent_card_name} successful: ${agent_card.price_usd}" + ) + logging.info( + f"๐Ÿ”— Transaction: https://basescan.org/tx/{tx_hash_hex}" + ) tx_hashes[agent_card_name] = tx_hash_hex successful_payments += 1 total_paid += agent_card.price_usd else: - logging.error(f"โŒ Payment to {agent_card_name} failed - transaction reverted") + logging.error( + f"โŒ Payment to {agent_card_name} failed - transaction reverted" + ) tx_hashes[agent_card_name] = None - + except Exception as e: logging.error(f"โŒ Payment to {agent_card_name} failed: {e}") tx_hashes[agent_card_name] = None - + # Log balances after payment try: - agent_balance_after = self.usdc_contract.functions.balanceOf(self.agent_account.address).call() - agent_balance_usd_after = agent_balance_after / (10 ** USDC_DECIMALS) - logging.info(f"๐Ÿ’ณ Agent USDC balance after payments: ${agent_balance_usd_after:.6f}") - logging.info(f"๐Ÿ’ฐ Total paid: ${total_paid} | Balance change: ${agent_balance_usd_before - agent_balance_usd_after:.6f}") + agent_balance_after = self.usdc_contract.functions.balanceOf( + self.agent_account.address + ).call() + agent_balance_usd_after = agent_balance_after / (10**USDC_DECIMALS) + logging.info( + f"๐Ÿ’ณ Agent USDC balance after payments: ${agent_balance_usd_after:.6f}" + ) + logging.info( + f"๐Ÿ’ฐ Total paid: ${total_paid} | Balance change: ${agent_balance_usd_before - agent_balance_usd_after:.6f}" + ) except Exception as e: logging.warning(f"Could not check agent balance after payment: {e}") - + all_successful = successful_payments == len(selected_agent_cards) - logging.info(f"๐Ÿ“Š Payment summary: {successful_payments}/{len(selected_agent_cards)} successful") + logging.info( + f"๐Ÿ“Š Payment summary: {successful_payments}/{len(selected_agent_cards)} successful" + ) return all_successful, tx_hashes - async def collect_trajectories(self, item) -> Tuple[Optional[ScoredDataGroup], List]: + async def collect_trajectories( + self, item + ) -> Tuple[Optional[ScoredDataGroup], List]: """Collect trajectories and score them after strategic agent card selection and payment.""" question = item - + # Agent selects agent cards strategically try: selection = await self._agent_select_agent_cards(question) except RuntimeError as e: - logging.warning(f"โญ๏ธ Skipping episode due to agent card selection failure: {e}") + logging.warning( + f"โญ๏ธ Skipping episode due to agent card selection failure: {e}" + ) return None, [] - + # Check budget if not self.budget_tracker.can_afford(selection.expected_cost): - logging.error(f"โญ๏ธ Skipping episode: Insufficient budget for evaluation. Need ${selection.expected_cost}, have ${self.budget_tracker.current_balance}") + logging.error( + f"โญ๏ธ Skipping episode: Insufficient budget for evaluation. Need ${selection.expected_cost}, have ${self.budget_tracker.current_balance}" + ) return None, [] - + # Log selection decision - self.agent_card_selection_history.append({ - "timestamp": time.time(), - "question": question[:50] + "..." if len(question) > 50 else question, - "selected_agent_cards": selection.selected_agent_cards, - "reasoning": selection.reasoning, - "expected_cost": float(selection.expected_cost), - "question_type": selection.question_type - }) - + self.agent_card_selection_history.append( + { + "timestamp": time.time(), + "question": question[:50] + "..." if len(question) > 50 else question, + "selected_agent_cards": selection.selected_agent_cards, + "reasoning": selection.reasoning, + "expected_cost": float(selection.expected_cost), + "question_type": selection.question_type, + } + ) + logging.info(f"Agent selected agent cards: {selection.selected_agent_cards}") logging.info(f"Selection reasoning: {selection.reasoning}") logging.info(f"Expected cost: ${selection.expected_cost}") - + # Generate responses - logging.info(f"๐Ÿค– Generating {self.config.group_size} responses for question: {question[:100]}{'...' if len(question) > 100 else ''}") - + logging.info( + f"๐Ÿค– Generating {self.config.group_size} responses for question: {question[:100]}{'...' if len(question) > 100 else ''}" + ) + async def generate_responses(): return await self.server.chat_completion( messages=[ - {"role": "system", "content": "You are a helpful AI assistant. Provide clear, accurate, and helpful responses."}, - {"role": "user", "content": question} + { + "role": "system", + "content": "You are a helpful AI assistant. Provide clear, accurate, and helpful responses.", + }, + {"role": "user", "content": question}, ], n=self.config.group_size, max_tokens=self.config.max_token_length, ) - + try: chat_completions = await generate_responses() except Exception as e: logging.error(f"โŒ Failed to generate responses: {e}") raise RuntimeError(f"Response generation failed: {e}") - + responses = [] for i, completion in enumerate(chat_completions.choices): response_text = completion.message.content - responses.append({ - "question": question, - "response": response_text, - "finish_reason": completion.finish_reason - }) + responses.append( + { + "question": question, + "response": response_text, + "finish_reason": completion.finish_reason, + } + ) # Log each agent generation - logging.info(f"๐Ÿค– Agent Generation {i+1}: {response_text[:200]}{'...' if len(response_text) > 200 else ''}") - + logging.info( + f"๐Ÿค– Agent Generation {i+1}: {response_text[:200]}{'...' if len(response_text) > 200 else ''}" + ) + logging.info(f"โœ… Generated {len(responses)} responses for evaluation") - + # Make payments to selected agent cards - payment_success, tx_hashes = await self._make_payments_to_agent_cards(selection.selected_agent_cards) - + payment_success, tx_hashes = await self._make_payments_to_agent_cards( + selection.selected_agent_cards + ) + # Log payment attempts for agent_card_name in selection.selected_agent_cards: agent_card_price = self.agent_cards[agent_card_name].price_usd success = tx_hashes.get(agent_card_name) is not None - self.payment_logs.append({ - "timestamp": time.time(), - "agent_card_name": agent_card_name, - "success": success, - "tx_hash": tx_hashes.get(agent_card_name), - "amount_usd": float(agent_card_price), - "question": question[:50] + "..." if len(question) > 50 else question - }) - + self.payment_logs.append( + { + "timestamp": time.time(), + "agent_card_name": agent_card_name, + "success": success, + "tx_hash": tx_hashes.get(agent_card_name), + "amount_usd": float(agent_card_price), + "question": ( + question[:50] + "..." if len(question) > 50 else question + ), + } + ) + # Update budget tracker for successful payments if success: self.budget_tracker.spend(agent_card_price, agent_card_name) - + if not payment_success: logging.error("Some payments failed - STOPPING TRAINING") failed_agent_cards = [j for j, tx in tx_hashes.items() if tx is None] raise RuntimeError(f"Payments failed to agent cards: {failed_agent_cards}") - + # Evaluate responses with selected agent cards - scored_data = await self._score_with_selected_agent_cards(responses, selection.selected_agent_cards) + scored_data = await self._score_with_selected_agent_cards( + responses, selection.selected_agent_cards + ) return scored_data, [] - async def _score_with_selected_agent_cards(self, responses, selected_agent_cards: List[str]) -> Optional[ScoredDataGroup]: + async def _score_with_selected_agent_cards( + self, responses, selected_agent_cards: List[str] + ) -> Optional[ScoredDataGroup]: """Score responses using the strategically selected agent cards.""" all_scores = [] agent_card_feedback = {} - + for agent_card_name in selected_agent_cards: agent_card = self.agent_cards[agent_card_name] - - logging.info(f"๐Ÿง‘โ€โš–๏ธ Agent card {agent_card_name} evaluating {len(responses)} responses...") - + + logging.info( + f"๐Ÿง‘โ€โš–๏ธ Agent card {agent_card_name} evaluating {len(responses)} responses..." + ) + # Evaluate each response with this agent card agent_card_scores = [] for i, response_data in enumerate(responses): @@ -582,100 +665,116 @@ Please evaluate the quality, accuracy, and helpfulness of this response based on Provide a score between 0.0 and 1.0, where 1.0 is excellent and 0.0 is poor. End your evaluation with \\boxed{{score}} where score is your numerical rating. """ - + # Get agent card evaluation async def get_agent_card_evaluation(): return await self.server.chat_completion( messages=[ {"role": "system", "content": agent_card.system_prompt}, - {"role": "user", "content": eval_prompt} + {"role": "user", "content": eval_prompt}, ], n=1, max_tokens=self.config.max_token_length, - split="eval" + split="eval", ) - + try: agent_card_completion = await get_agent_card_evaluation() except Exception as e: - logging.error(f"โŒ Agent card {agent_card_name} evaluation failed for response {i+1}: {e}") + logging.error( + f"โŒ Agent card {agent_card_name} evaluation failed for response {i+1}: {e}" + ) # Use fallback score if agent card evaluation fails score = 0.5 agent_card_response = f"Evaluation failed: {e}" else: # Extract score from agent card response - agent_card_response = agent_card_completion.choices[0].message.content + agent_card_response = agent_card_completion.choices[ + 0 + ].message.content score = self._extract_score_from_agent_card(agent_card_response) - + agent_card_scores.append(score) - + # Log detailed agent card feedback logging.info(f" ๐Ÿ“ Response {i+1} Score: {score:.3f}") - logging.info(f" ๐Ÿ’ฌ Agent card Feedback: {agent_card_response[:300]}{'...' if len(agent_card_response) > 300 else ''}") - + logging.info( + f" ๐Ÿ’ฌ Agent card Feedback: {agent_card_response[:300]}{'...' if len(agent_card_response) > 300 else ''}" + ) + # Update agent card statistics agent_card.total_evaluations += 1 - agent_card.average_score_given = (agent_card.average_score_given * (agent_card.total_evaluations - 1) + score) / agent_card.total_evaluations - - logging.info(f"๐Ÿง‘โ€โš–๏ธ Agent card {agent_card_name} completed evaluation - Average score: {sum(agent_card_scores)/len(agent_card_scores):.3f}") - + agent_card.average_score_given = ( + agent_card.average_score_given * (agent_card.total_evaluations - 1) + + score + ) / agent_card.total_evaluations + + logging.info( + f"๐Ÿง‘โ€โš–๏ธ Agent card {agent_card_name} completed evaluation - Average score: {sum(agent_card_scores)/len(agent_card_scores):.3f}" + ) + all_scores.append(agent_card_scores) agent_card_feedback[agent_card_name] = { "scores": agent_card_scores, "average": sum(agent_card_scores) / len(agent_card_scores), - "price": float(agent_card.price_usd) + "price": float(agent_card.price_usd), } - + # Aggregate scores from multiple agent cards (average) if not all_scores: return None - + num_responses = len(responses) aggregated_scores = [] - + for i in range(num_responses): response_scores = [agent_card_scores[i] for agent_card_scores in all_scores] avg_score = sum(response_scores) / len(response_scores) aggregated_scores.append(avg_score) - logging.info(f"๐Ÿ“Š Response {i+1} Final Score: {avg_score:.3f} (from {len(response_scores)} agent cards)") - - logging.info(f"๐ŸŽฏ Evaluation Summary: Scores range {min(aggregated_scores):.3f} - {max(aggregated_scores):.3f}, Average: {sum(aggregated_scores)/len(aggregated_scores):.3f}") - + logging.info( + f"๐Ÿ“Š Response {i+1} Final Score: {avg_score:.3f} (from {len(response_scores)} agent cards)" + ) + + logging.info( + f"๐ŸŽฏ Evaluation Summary: Scores range {min(aggregated_scores):.3f} - {max(aggregated_scores):.3f}, Average: {sum(aggregated_scores)/len(aggregated_scores):.3f}" + ) + # Create scored data scores = ScoredDataGroup() scores["tokens"] = [] scores["masks"] = [] scores["scores"] = [] - + for i, response_data in enumerate(responses): # Tokenize for trainer messages = [ - {"role": "system", "content": "You are a helpful AI assistant. Provide clear, accurate, and helpful responses."}, - {"role": "user", "content": response_data['question']}, - {"role": "assistant", "content": response_data['response']} + { + "role": "system", + "content": "You are a helpful AI assistant. Provide clear, accurate, and helpful responses.", + }, + {"role": "user", "content": response_data["question"]}, + {"role": "assistant", "content": response_data["response"]}, ] - + out_dict = tokenize_for_trainer( - self.tokenizer, - messages, - response_data['finish_reason'] + self.tokenizer, messages, response_data["finish_reason"] ) - + scores["tokens"].append(out_dict["tokens"]) scores["masks"].append(out_dict["masks"]) scores["scores"].append(aggregated_scores[i]) - + # Track for metrics self.percent_correct_buffer.append(aggregated_scores[i]) - + # Store agent card feedback for analysis - if hasattr(self, 'last_agent_card_feedback'): + if hasattr(self, "last_agent_card_feedback"): self.last_agent_card_feedback = agent_card_feedback - + # Ensure we have different scores for training signal if len(set(scores["scores"])) == 1: return None - + return scores def _extract_score_from_agent_card(self, agent_card_response: str) -> float: @@ -689,17 +788,18 @@ End your evaluation with \\boxed{{score}} where score is your numerical rating. score_str = agent_card_response[start:end].strip() score = float(score_str) return max(0.0, min(1.0, score)) # Clamp to [0, 1] - + # Fallback: look for decimal numbers import re - numbers = re.findall(r'\b0\.\d+\b|\b1\.0\b', agent_card_response) + + numbers = re.findall(r"\b0\.\d+\b|\b1\.0\b", agent_card_response) if numbers: score = float(numbers[-1]) return max(0.0, min(1.0, score)) - + except (ValueError, IndexError): pass - + # If agent card can't provide a valid score, treat as failure return 0.0 @@ -707,58 +807,64 @@ End your evaluation with \\boxed{{score}} where score is your numerical rating. """Run evaluation on test questions.""" dataset = self._load_questions_dataset() eval_questions = [q["text"] for q in dataset["evaluation_questions"]] - + total_score = 0 count = 0 - agent_card_performance = {agent_card_name: [] for agent_card_name in self.agent_cards.keys()} - + agent_card_performance = { + agent_card_name: [] for agent_card_name in self.agent_cards.keys() + } + for question in eval_questions: completion = await self.server.chat_completion( messages=[ {"role": "system", "content": "You are a helpful AI assistant."}, - {"role": "user", "content": question} + {"role": "user", "content": question}, ], n=1, max_tokens=self.config.max_token_length, temperature=0.0, - split="eval" + split="eval", ) - + response = completion.choices[0].message.content - + # Evaluate with each agent card (no payment required for eval) question_scores = [] for agent_card_name, agent_card in self.agent_cards.items(): eval_prompt = f"Question: {question}\n\nResponse: {response}\n\nEvaluate this response based on your expertise:" - + agent_card_completion = await self.server.chat_completion( messages=[ {"role": "system", "content": agent_card.system_prompt}, - {"role": "user", "content": eval_prompt} + {"role": "user", "content": eval_prompt}, ], n=1, max_tokens=self.config.max_token_length, - split="eval" + split="eval", + ) + + score = self._extract_score_from_agent_card( + agent_card_completion.choices[0].message.content ) - - score = self._extract_score_from_agent_card(agent_card_completion.choices[0].message.content) question_scores.append(score) agent_card_performance[agent_card_name].append(score) - + # Average score across all agent cards for this question avg_score = sum(question_scores) / len(question_scores) total_score += avg_score count += 1 - + if count > 0: overall_avg_score = total_score / count self.eval_metrics.append(("eval/average_score", overall_avg_score)) - + # Add per-agent card evaluation metrics for agent_card_name, scores in agent_card_performance.items(): if scores: agent_card_avg = sum(scores) / len(scores) - self.eval_metrics.append((f"eval/agent_card_{agent_card_name}_avg_score", agent_card_avg)) + self.eval_metrics.append( + (f"eval/agent_card_{agent_card_name}_avg_score", agent_card_avg) + ) async def get_next_item(self): """Get next question for training.""" @@ -770,98 +876,159 @@ End your evaluation with \\boxed{{score}} where score is your numerical rating. """Log metrics to Weights & Biases.""" if wandb_metrics is None: wandb_metrics = {} - + # Log budget and spending metrics - wandb_metrics["budget/current_balance"] = float(self.budget_tracker.current_balance) + wandb_metrics["budget/current_balance"] = float( + self.budget_tracker.current_balance + ) wandb_metrics["budget/total_spent"] = float(self.budget_tracker.total_spent) - wandb_metrics["budget/evaluations_count"] = self.budget_tracker.evaluations_count - wandb_metrics["budget/average_cost_per_eval"] = float(self.budget_tracker.average_cost_per_eval) - + wandb_metrics["budget/evaluations_count"] = ( + self.budget_tracker.evaluations_count + ) + wandb_metrics["budget/average_cost_per_eval"] = float( + self.budget_tracker.average_cost_per_eval + ) + # Budget utilization percentage - budget_utilization = float(self.budget_tracker.total_spent / self.budget_tracker.initial_budget) * 100 + budget_utilization = ( + float(self.budget_tracker.total_spent / self.budget_tracker.initial_budget) + * 100 + ) wandb_metrics["budget/utilization_percent"] = budget_utilization - + # Per-agent card spending breakdown - for agent_card_name, amount in self.budget_tracker.spending_per_agent_card.items(): - wandb_metrics[f"spending/agent_card_{agent_card_name}_total"] = float(amount) + for ( + agent_card_name, + amount, + ) in self.budget_tracker.spending_per_agent_card.items(): + wandb_metrics[f"spending/agent_card_{agent_card_name}_total"] = float( + amount + ) if self.budget_tracker.total_spent > 0: percentage = float(amount / self.budget_tracker.total_spent) * 100 - wandb_metrics[f"spending/agent_card_{agent_card_name}_percent"] = percentage - + wandb_metrics[f"spending/agent_card_{agent_card_name}_percent"] = ( + percentage + ) + # Agent card performance metrics for agent_card_name, agent_card in self.agent_cards.items(): - wandb_metrics[f"agent_card_performance/{agent_card_name}_avg_score"] = agent_card.average_score_given - wandb_metrics[f"agent_card_performance/{agent_card_name}_total_evals"] = agent_card.total_evaluations - wandb_metrics[f"agent_card_performance/{agent_card_name}_satisfaction"] = agent_card.agent_satisfaction - wandb_metrics[f"agent_card_performance/{agent_card_name}_consistency"] = agent_card.consistency_score - wandb_metrics[f"agent_card_performance/{agent_card_name}_price_usd"] = float(agent_card.price_usd) - + wandb_metrics[f"agent_card_performance/{agent_card_name}_avg_score"] = ( + agent_card.average_score_given + ) + wandb_metrics[f"agent_card_performance/{agent_card_name}_total_evals"] = ( + agent_card.total_evaluations + ) + wandb_metrics[f"agent_card_performance/{agent_card_name}_satisfaction"] = ( + agent_card.agent_satisfaction + ) + wandb_metrics[f"agent_card_performance/{agent_card_name}_consistency"] = ( + agent_card.consistency_score + ) + wandb_metrics[f"agent_card_performance/{agent_card_name}_price_usd"] = ( + float(agent_card.price_usd) + ) + # Payment statistics if self.payment_logs: - successful_payments = sum(1 for log in self.payment_logs if log['success']) + successful_payments = sum(1 for log in self.payment_logs if log["success"]) total_payments = len(self.payment_logs) - total_cost = sum(log['amount_usd'] for log in self.payment_logs if log['success']) - - wandb_metrics["payments/success_rate"] = successful_payments / total_payments if total_payments > 0 else 0 + total_cost = sum( + log["amount_usd"] for log in self.payment_logs if log["success"] + ) + + wandb_metrics["payments/success_rate"] = ( + successful_payments / total_payments if total_payments > 0 else 0 + ) wandb_metrics["payments/total_cost_usd"] = total_cost wandb_metrics["payments/total_attempts"] = total_payments - + # Agent card selection frequency agent_card_selections = {} for log in self.payment_logs: - if log['success']: - agent_card_name = log['agent_card_name'] - agent_card_selections[agent_card_name] = agent_card_selections.get(agent_card_name, 0) + 1 - + if log["success"]: + agent_card_name = log["agent_card_name"] + agent_card_selections[agent_card_name] = ( + agent_card_selections.get(agent_card_name, 0) + 1 + ) + for agent_card_name, selection_count in agent_card_selections.items(): - wandb_metrics[f"selection_frequency/{agent_card_name}"] = selection_count + wandb_metrics[f"selection_frequency/{agent_card_name}"] = ( + selection_count + ) if successful_payments > 0: - wandb_metrics[f"selection_frequency/{agent_card_name}_percent"] = (selection_count / successful_payments) * 100 - + wandb_metrics[f"selection_frequency/{agent_card_name}_percent"] = ( + selection_count / successful_payments + ) * 100 + # Create payment log table if len(self.payment_logs) > 0: - table = wandb.Table(columns=["timestamp", "agent_card_name", "success", "tx_hash", "amount_usd"]) + table = wandb.Table( + columns=[ + "timestamp", + "agent_card_name", + "success", + "tx_hash", + "amount_usd", + ] + ) for log in self.payment_logs[-10:]: # Last 10 payments table.add_data( - log['timestamp'], - log['agent_card_name'], - log['success'], - log.get('tx_hash', 'N/A'), - float(log['amount_usd']) if log['success'] else 0 + log["timestamp"], + log["agent_card_name"], + log["success"], + log.get("tx_hash", "N/A"), + float(log["amount_usd"]) if log["success"] else 0, ) wandb_metrics["payments/recent_transactions"] = table - + self.payment_logs = [] # Clear logs - + # Agent card selection history if self.agent_card_selection_history: # Create selection history table - selection_table = wandb.Table(columns=["timestamp", "question", "selected_agent_cards", "reasoning", "cost", "question_type"]) - for selection in self.agent_card_selection_history[-10:]: # Last 10 selections + selection_table = wandb.Table( + columns=[ + "timestamp", + "question", + "selected_agent_cards", + "reasoning", + "cost", + "question_type", + ] + ) + for selection in self.agent_card_selection_history[ + -10: + ]: # Last 10 selections selection_table.add_data( - selection['timestamp'], - selection['question'], - ', '.join(selection['selected_agent_cards']), - selection['reasoning'][:100] + "..." if len(selection['reasoning']) > 100 else selection['reasoning'], - selection['expected_cost'], - selection['question_type'] + selection["timestamp"], + selection["question"], + ", ".join(selection["selected_agent_cards"]), + ( + selection["reasoning"][:100] + "..." + if len(selection["reasoning"]) > 100 + else selection["reasoning"] + ), + selection["expected_cost"], + selection["question_type"], ) wandb_metrics["agent_decisions/agent_card_selections"] = selection_table - + # Selection strategy analysis question_types = {} for selection in self.agent_card_selection_history: - q_type = selection['question_type'] + q_type = selection["question_type"] question_types[q_type] = question_types.get(q_type, 0) + 1 - + for q_type, count in question_types.items(): wandb_metrics[f"question_analysis/{q_type}_count"] = count - + self.agent_card_selection_history = [] # Clear history # Training performance metrics if self.percent_correct_buffer: - wandb_metrics["train/percent_correct"] = sum(self.percent_correct_buffer) / len(self.percent_correct_buffer) + wandb_metrics["train/percent_correct"] = sum( + self.percent_correct_buffer + ) / len(self.percent_correct_buffer) self.percent_correct_buffer = [] # Evaluation metrics @@ -877,19 +1044,25 @@ End your evaluation with \\boxed{{score}} where score is your numerical rating. data = {} data["iter"] = self.iter data["agent_address"] = self.agent_account.address - data["agent_card_addresses"] = {agent_card_name: agent_card.address for agent_card_name, agent_card in self.agent_cards.items()} + data["agent_card_addresses"] = { + agent_card_name: agent_card.address + for agent_card_name, agent_card in self.agent_cards.items() + } data["budget_tracker"] = { "current_balance": float(self.budget_tracker.current_balance), "total_spent": float(self.budget_tracker.total_spent), "evaluations_count": self.budget_tracker.evaluations_count, - "spending_per_agent_card": {k: float(v) for k, v in self.budget_tracker.spending_per_agent_card.items()} + "spending_per_agent_card": { + k: float(v) + for k, v in self.budget_tracker.spending_per_agent_card.items() + }, } data["agent_card_performance"] = { agent_card_name: { "total_evaluations": agent_card.total_evaluations, "average_score_given": agent_card.average_score_given, "agent_satisfaction": agent_card.agent_satisfaction, - "consistency_score": agent_card.consistency_score + "consistency_score": agent_card.consistency_score, } for agent_card_name, agent_card in self.agent_cards.items() } @@ -899,16 +1072,18 @@ End your evaluation with \\boxed{{score}} where score is your numerical rating. """Agent makes strategic decision about which agent cards to hire.""" # Get agent card performance history agent_card_stats = self._get_agent_card_performance_stats() - + # Analyze the question to understand its requirements question_analysis = self._analyze_question_requirements(question) - + # Create a much simpler selection prompt agent_cards_list = [] for agent_card_name, agent_card in self.agent_cards.items(): agent_card_specialties = [s.value for s in agent_card.specialties] - agent_cards_list.append(f"{agent_card_name}: ${agent_card.price_usd} ({', '.join(agent_card_specialties)})") - + agent_cards_list.append( + f"{agent_card_name}: ${agent_card.price_usd} ({', '.join(agent_card_specialties)})" + ) + selection_prompt = f"""Question: "{question}" Budget: ${self.budget_tracker.current_balance:.2f} @@ -923,14 +1098,17 @@ Select 1-2 agent cards by ID. Respond with JSON: async def get_agent_selection(): return await self.server.chat_completion( messages=[ - {"role": "system", "content": "You are a strategic AI agent. Select agent cards for evaluation. Respond only with valid JSON."}, - {"role": "user", "content": selection_prompt} + { + "role": "system", + "content": "You are a strategic AI agent. Select agent cards for evaluation. Respond only with valid JSON.", + }, + {"role": "user", "content": selection_prompt}, ], n=1, max_tokens=200, # Much smaller limit for simple response - temperature=0.1 + temperature=0.1, ) - + try: selection_response = await get_agent_selection() except Exception as e: @@ -942,51 +1120,63 @@ Select 1-2 agent cards by ID. Respond with JSON: selected_agent_cards=[fallback_agent_card], reasoning="Fallback selection due to agent failure", expected_cost=self.agent_cards[fallback_agent_card].price_usd, - question_type=question_analysis.get('category', 'General') + question_type=question_analysis.get("category", "General"), ) - + # Parse response selection_text = selection_response.choices[0].message.content logging.info(f"๐Ÿค– Agent selection response: {selection_text}") - + try: - import re import json - + import re + # Extract JSON from response - json_match = re.search(r'\{.*\}', selection_text, re.DOTALL) + json_match = re.search(r"\{.*\}", selection_text, re.DOTALL) if json_match: selection_data = json.loads(json_match.group()) selected_names = selection_data.get("selected_agent_cards", []) - + # Validate selections - valid_selections = [name for name in selected_names if name in self.agent_cards] - + valid_selections = [ + name for name in selected_names if name in self.agent_cards + ] + if valid_selections: - total_cost = sum(self.agent_cards[name].price_usd for name in valid_selections) - + total_cost = sum( + self.agent_cards[name].price_usd for name in valid_selections + ) + if self.budget_tracker.can_afford(total_cost): - logging.info(f"โœ… Selected agent cards: {valid_selections} for ${total_cost}") + logging.info( + f"โœ… Selected agent cards: {valid_selections} for ${total_cost}" + ) return AgentCardSelection( selected_agent_cards=valid_selections, - reasoning=selection_data.get("reasoning", "Agent selection"), + reasoning=selection_data.get( + "reasoning", "Agent selection" + ), expected_cost=total_cost, - question_type=question_analysis.get('category', 'General') + question_type=question_analysis.get("category", "General"), ) else: - logging.warning(f"โš ๏ธ Selection too expensive: ${total_cost} > ${self.budget_tracker.current_balance}") - + logging.warning( + f"โš ๏ธ Selection too expensive: ${total_cost} > ${self.budget_tracker.current_balance}" + ) + except Exception as e: logging.error(f"โŒ Failed to parse agent response: {e}") - + # Fallback to cheapest agent card - cheapest_agent_card = min(self.agent_cards.keys(), key=lambda j: self.agent_cards[j].price_usd) + cheapest_agent_card = min( + self.agent_cards.keys(), key=lambda j: self.agent_cards[j].price_usd + ) logging.info(f"๐Ÿ”„ Using cheapest agent card fallback: {cheapest_agent_card}") return AgentCardSelection( selected_agent_cards=[cheapest_agent_card], reasoning="Fallback to cheapest agent card", expected_cost=self.agent_cards[cheapest_agent_card].price_usd, - question_type=question_analysis.get('category', 'General') + question_type=question_analysis.get("category", "General"), ) def _load_questions_dataset(self) -> Dict: @@ -994,35 +1184,45 @@ Select 1-2 agent cards by ID. Respond with JSON: questions_file = Path(__file__).parent / "questions.json" if not questions_file.exists(): raise FileNotFoundError(f"Questions dataset not found: {questions_file}") - - with open(questions_file, 'r') as f: + + with open(questions_file, "r") as f: return json.load(f) def _analyze_question_requirements(self, question_text: str) -> Dict[str, any]: """Analyze a question to determine what specialties might be needed.""" dataset = self._load_questions_dataset() - + # Search for question in dataset question_data = None for q in dataset["training_questions"] + dataset["evaluation_questions"]: if q["text"] == question_text: question_data = q break - + if question_data: # Calculate complexity score based on specialties and difficulty - difficulty_multiplier = {"basic": 1, "intermediate": 2, "advanced": 3, "expert": 4} - complexity_score = len(question_data["expected_specialties"]) * difficulty_multiplier.get(question_data["difficulty"], 2) - + difficulty_multiplier = { + "basic": 1, + "intermediate": 2, + "advanced": 3, + "expert": 4, + } + complexity_score = len( + question_data["expected_specialties"] + ) * difficulty_multiplier.get(question_data["difficulty"], 2) + return { "category": question_data["category"], "difficulty": question_data["difficulty"], "expected_specialties": question_data["expected_specialties"], "description": question_data["description"], - "requires_multiple_agent_cards": len(question_data["expected_specialties"]) > 1, - "complexity_score": complexity_score + "requires_multiple_agent_cards": len( + question_data["expected_specialties"] + ) + > 1, + "complexity_score": complexity_score, } - + # Fallback for unknown questions return { "category": "unknown", @@ -1030,9 +1230,9 @@ Select 1-2 agent cards by ID. Respond with JSON: "expected_specialties": ["general"], "description": "Unknown question type", "requires_multiple_agent_cards": False, - "complexity_score": 2 + "complexity_score": 2, } if __name__ == "__main__": - PayToPlayEnv.cli() \ No newline at end of file + PayToPlayEnv.cli() diff --git a/environments/community/pay_to_play/requirements.txt b/environments/community/pay_to_play/requirements.txt index 06ba5bba..c530ef82 100644 --- a/environments/community/pay_to_play/requirements.txt +++ b/environments/community/pay_to_play/requirements.txt @@ -1,3 +1,3 @@ web3>=6.0.0 eth-account>=0.8.0 -base58>=2.1.0 \ No newline at end of file +base58>=2.1.0 diff --git a/environments/community/pay_to_play/secrets.json.template b/environments/community/pay_to_play/secrets.json.template index 65ea1291..a541234e 100644 --- a/environments/community/pay_to_play/secrets.json.template +++ b/environments/community/pay_to_play/secrets.json.template @@ -20,4 +20,4 @@ "network": "base", "chain_id": 8453, "usdc_contract": "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" -} \ No newline at end of file +}