diff --git a/environments/community/README.md b/environments/community/README.md index b40a8aa2..9ea7dc5c 100644 --- a/environments/community/README.md +++ b/environments/community/README.md @@ -2894,6 +2894,207 @@ python generate_humor_dataset.py --- +### 30. Meteorology Forecast RL Environment (`meteorology_forecast/`) + +**Contributors**: FahrenheitResearch, drewsny +**PR**: [#68](https://github.com/NousResearch/atropos/pull/68) +**Integration Status**: ✅ Integrated + +**Description**: A reinforcement learning environment designed to train LLMs on interpreting numerical weather prediction (NWP) model sounding data and making informed meteorological forecast assessments. The environment moves beyond static graphical outputs to a text-structured, LLM-readable format that enables programmatic reasoning and analysis of weather data. + +**Core Features**: + +**NWP Model Data Processing**: +- **Real Weather Data Integration**: Uses actual numerical weather prediction model sounding data (RAP, HRRR) +- **Multi-Location Support**: Processes weather data from multiple geographical locations +- **Time Series Analysis**: Analyzes forecast data across multiple UTC time periods (6, 9, 12, 15, 18 hours) +- **Area Forecast Discussion (AFD) Integration**: Incorporates human forecaster discussions for evaluation context + +**Meteorological Reasoning Framework**: +- **Three-Phase Analysis**: Detailed reasoning, tool calling, and forecast summarization +- **Conceptual Tool Integration**: Available tools include surface observations, radar imagery, satellite data, and upper-air soundings +- **Severe Weather Focus**: Specialized assessment of severe weather potential and risks +- **Professional Format**: Output format matches professional meteorological analysis standards + +**Dual LLM Architecture**: +- **Agent LLM**: Analyzes sounding data and generates forecasts (default: Qwen/Qwen3-8B) +- **Judge LLM**: Expert meteorologist evaluation using Gemini-2.5-Flash-Preview via OpenRouter +- **Separate API Endpoints**: Independent configuration for agent and judge models +- **Comprehensive Scoring**: 10-point scale evaluation across multiple meteorological criteria + +**Expert Evaluation System**: +- **Meteorological Soundness** (0-5 points): Correct interpretation of sounding parameters, logical weather connections, depth of analysis +- **Tool Call Relevance** (0-3 points): Appropriate tool usage given model data and reasoning +- **Forecast Summary Quality** (0-2 points): Clarity, conciseness, alignment with reasoning and AFDs +- **Professional Justification**: Detailed textual feedback on forecast quality + +**Technical Implementation**: + +**Data Structure and Processing**: +- **JSONL Sounding Data**: Structured format optimized for LLM consumption +- **Pattern Matching**: Automated discovery of sounding files by location and time +- **AFD Text Processing**: Area Forecast Discussion integration with encoding handling +- **Case Generation**: Systematic creation of forecast scenarios with target times + +**Environment Configuration**: +```python +sounding_data_root: str = "environments/community/meteorology_forecast/data/" +target_date: str = "20250314" # YYYYMMDD format +judge_model_name: str = "google/gemini-2.5-flash-preview" +nwp_models_to_use: List[str] = ["RAP"] +forecast_hours_to_sample: List[int] = [6, 9, 12, 15, 18] +max_reasoning_tokens_llm: int = 3000 +max_tokens_judge: int = 2000 +``` + +**Agent System Prompt**: +The environment instructs the agent to: +1. Provide detailed step-by-step meteorological reasoning +2. Identify trends in atmospheric parameters and connect them to weather phenomena +3. Call conceptual tools when additional observational data would improve assessment +4. Generate professional forecast summaries using "FORECAST_SUMMARY:" format + +**Judge Evaluation Process**: +1. **Input Analysis**: Receives agent output and relevant human forecaster AFDs +2. **Multi-Criteria Assessment**: Evaluates reasoning quality, tool appropriateness, and forecast clarity +3. **Structured Scoring**: Provides numerical scores in standardized format +4. **Professional Justification**: Detailed explanation of scoring decisions + +**Training and Evaluation Workflow**: + +**Data Collection Loop**: +- **Case Sampling**: Random selection from available weather scenarios +- **Prompt Generation**: Dynamic creation of location-specific forecast prompts +- **Agent Inference**: LLM analysis of sounding data with reasoning and tool calls +- **Judge Evaluation**: Expert assessment of agent performance +- **Score Integration**: Tokenization and score assignment for RL training + +**WandB Metrics Tracking**: +- `train/avg_judge_total_score`: Overall forecast quality (0-10 scale) +- `train/avg_judge_reasoning_score`: Depth and accuracy of reasoning (0-5) +- `train/avg_judge_tool_score`: Tool usage relevance (0-3) +- `train/avg_judge_forecast_score`: Forecast clarity and alignment (0-2) +- `train/detailed_rollouts`: Comprehensive logging of prompts, reasoning, tools, and justifications + +**Research Applications**: + +**Meteorological AI Development**: +- **Professional Weather Analysis**: Training AI systems for operational meteorology +- **Decision Support Systems**: AI assistance for human forecasters during severe weather +- **Automated Forecast Generation**: Custom forecasts for arbitrary geographic locations +- **Meteorological Education**: Teaching weather analysis and forecasting principles + +**Multi-Modal Reasoning**: +- **Tool-Augmented Analysis**: Learning when and how to request additional observational data +- **Contextual Decision Making**: Integrating model data with human forecaster insights +- **Structured Output Generation**: Professional-format meteorological communication +- **Domain Expertise Transfer**: Incorporating specialized meteorological knowledge + +**Real-World Integration Potential**: +- **National Weather Service Integration**: Complementing operational forecast workflows +- **Emergency Management**: Enhanced severe weather warning systems +- **Aviation Meteorology**: Specialized forecasts for flight planning and safety +- **Agricultural Applications**: Crop-specific weather analysis and forecasting + +**Data Requirements**: + +**Sounding Data Format**: +- **Location Structure**: `data/YYYYMMDD/{location_id}/` +- **File Pattern**: `{location_id}_{model}_{timestamp}.jsonl` +- **AFD Files**: `AFD_*.txt` for human forecaster context +- **JSONL Format**: Structured atmospheric profile data optimized for LLM processing + +**Example Data Structure**: +``` +environments/community/meteorology_forecast/data/ +└── 20250314/ + ├── KOKC/ # Oklahoma City + │ ├── KOKC_RAP_20250314_12Z.buf_default_llm_optimized.jsonl + │ ├── AFD_OUN.txt + │ └── ... + └── KORD/ # Chicago O'Hare + ├── KORD_RAP_20250314_12Z.buf_default_llm_optimized.jsonl + ├── AFD_LOT.txt + └── ... +``` + +**Setup and Usage**: + +**Environment Variables**: +- `AGENT_LLM_MODEL_NAME`: Agent model selection (default: Qwen/Qwen3-8B) +- `AGENT_LLM_API_KEY`: API key for agent model +- `AGENT_LLM_BASE_URL`: Base URL for agent model API +- `OPENROUTER_API_KEY`: Required for judge model (Gemini-2.5-Flash-Preview) + +**Command Line Usage**: +```bash +# Set up required API keys +export AGENT_LLM_API_KEY="your_agent_api_key" +export OPENROUTER_API_KEY="your_openrouter_api_key" + +# Run meteorology forecast environment +python environments/community/meteorology_forecast/meteorology_env.py serve \ + --env.group_size 2 \ + --env.use_wandb True \ + --env.target_date 20250314 \ + --openai.api_key $AGENT_LLM_API_KEY \ + --openai.base_url http://localhost:8080/v1 \ + --openai.model_name Qwen/Qwen3-8B +``` + +**Performance Characteristics**: + +**Computational Requirements**: +- **Agent Model**: Qwen/Qwen3-8B or similar (configurable) +- **Judge Model**: Gemini-2.5-Flash-Preview via OpenRouter API +- **Memory Usage**: Moderate (depends on sounding data volume) +- **Processing Time**: Variable based on number of locations and time periods + +**Training Metrics**: +- **Episode Length**: Variable based on available weather cases +- **Reward Signal**: Expert judge scores (0-10 scale) +- **Evaluation Frequency**: Configurable steps per evaluation (default: 100) +- **Data Throughput**: Thousands of location-specific soundings per model run + +**Demo and Results**: +- **W&B Dashboard**: [Example training run](https://wandb.ai/fahrenheitagi-fahrenheitagi/my_atropos_rl_experiments/runs/dsubhw9i/overview) +- **Performance Tracking**: Real-time monitoring of forecast quality improvements +- **Detailed Logging**: Complete conversation histories with expert evaluations + +**Future Enhancements**: + +**Extended Weather Data**: +- **Additional NWP Models**: HRRR, GFS, NAM integration +- **Satellite Data**: Direct integration of satellite imagery analysis +- **Radar Data**: Real-time radar interpretation capabilities +- **Ensemble Forecasting**: Multi-model consensus analysis + +**Advanced Meteorological Features**: +- **Mesoscale Analysis**: High-resolution weather pattern recognition +- **Climate Integration**: Long-term climate data context +- **Specialized Domains**: Marine, aviation, agricultural meteorology +- **Real-Time Integration**: Live weather data processing + +**Professional Applications**: +- **Forecast Verification**: Automated accuracy assessment +- **Warning Systems**: Severe weather alert generation +- **Briefing Generation**: Automated meteorological briefings +- **Educational Tools**: Interactive weather analysis training + +**Research Impact**: This environment represents a significant advancement in applying AI to meteorological analysis, providing a framework for training language models on real weather data with expert-level evaluation. The integration of professional meteorological workflows with RL training opens new possibilities for AI-assisted weather forecasting. + +**Educational Value**: The environment serves as an excellent example of domain-specific RL applications, demonstrating how specialized knowledge can be incorporated into AI training through expert evaluation systems and structured data formats. + +**Limitations**: +- **Data Dependency**: Requires access to NWP model sounding data +- **Expert Evaluation Cost**: Judge model API calls for evaluation +- **Domain Specificity**: Focused on meteorological applications +- **Real-Time Constraints**: Historical data training vs. operational forecasting + +**Requirements**: wandb, pydantic, httpx, atroposlib + +--- + ## Support For questions or issues with community environments: diff --git a/environments/hack0/README.md b/environments/community/meteorology_forecast/README.md similarity index 100% rename from environments/hack0/README.md rename to environments/community/meteorology_forecast/README.md diff --git a/environments/hack0/meteorology_forecast_env.py b/environments/community/meteorology_forecast/meteorology_env.py similarity index 84% rename from environments/hack0/meteorology_forecast_env.py rename to environments/community/meteorology_forecast/meteorology_env.py index 5b0ff66d..481c59f1 100644 --- a/environments/hack0/meteorology_forecast_env.py +++ b/environments/community/meteorology_forecast/meteorology_env.py @@ -9,9 +9,9 @@ from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Tuple -import wandb from pydantic import Field +import wandb from atroposlib.envs.base import ( APIServer, APIServerConfig, @@ -37,7 +37,9 @@ class MetRLConfig(BaseEnvConfig): max_token_length: int = Field(default=2048) inference_weight: float = Field(default=1.0) wandb_name: Optional[str] = Field(default=None) - data_path_to_save_groups: Optional[str] = Field(default="data/MeteorologyForecastRL.jsonl") + data_path_to_save_groups: Optional[str] = Field( + default="data/MeteorologyForecastRL.jsonl" + ) eval_handling: EvalHandlingEnum = Field(default=EvalHandlingEnum.STOP_TRAIN) eval_limit_ratio: float = Field(default=0.5) num_eval_samples: int = Field(default=20) @@ -52,11 +54,12 @@ class MetRLConfig(BaseEnvConfig): max_batches_offpolicy: int = Field(default=3) sounding_data_root: str = Field( - default="/Users/hackathon/atropos/environments/hack0/data/", + default="environments/community/meteorology_forecast/data/", description="Root directory for all sounding and AFD data.", ) target_date: str = Field( - default="20250314", description="The specific date to load data for (YYYYMMDD format)." + default="20250314", + description="The specific date to load data for (YYYYMMDD format).", ) judge_model_name: str = Field( default="google/gemini-2.5-flash-preview", @@ -82,7 +85,8 @@ class MetRLConfig(BaseEnvConfig): description="Offset from the latest provided sounding hour to set the target forecast time.", ) max_afds_for_judge: int = Field( - default=3, description="Maximum number of AFD files to provide to the judge model." + default=3, + description="Maximum number of AFD files to provide to the judge model.", ) max_reasoning_tokens_llm: int = Field( default=3000, description="Max tokens for the agent LLM's generation." @@ -92,25 +96,35 @@ class MetRLConfig(BaseEnvConfig): ) -AGENT_SYSTEM_PROMPT = """You are a highly skilled AI meteorologist. Your task is to analyze numerical weather prediction (NWP) model sounding data for a specific location and time period. +AGENT_SYSTEM_PROMPT = """You are a highly skilled AI meteorologist. Your task is to analyze +numerical weather prediction (NWP) model sounding data for a specific location and time period. Based on your analysis, you must: -1. Provide a detailed step-by-step reasoning process. This should include identifying trends, interpreting meteorological parameters, and connecting them to potential weather phenomena. -2. If you determine that additional real-time observational data is crucial for a more accurate assessment, specify the tools you would use. For each tool, output a line in the exact format: TOOL_CALL: {{"tool_name": "tool_name_here", "arguments": {{"param1": "value1", ...}}}} - Available conceptual tools: get_surface_observations, get_latest_radar_imagery, get_satellite_imagery, get_upper_air_sounding. -3. Conclude with a concise forecast summary for the specified target time. Start this summary with "FORECAST_SUMMARY: ". +1. Provide a detailed step-by-step reasoning process. This should include identifying trends, + interpreting meteorological parameters, and connecting them to potential weather phenomena. +2. If you determine that additional real-time observational data is crucial for a more accurate + assessment, specify the tools you would use. For each tool, output a line in the exact format: + TOOL_CALL: {{"tool_name": "tool_name_here", "arguments": {{"param1": "value1", ...}}}} + Available conceptual tools: get_surface_observations, get_latest_radar_imagery, + get_satellite_imagery, get_upper_air_sounding. +3. Conclude with a concise forecast summary for the specified target time. Start this summary + with "FORECAST_SUMMARY: ". Analyze the provided data thoroughly. Your reasoning should be comprehensive.""" AGENT_USER_PROMPT_TEMPLATE = """Please analyze the following NWP model sounding data for station {location_id}. -The soundings provided are from the {model_name} model, run on {run_date_full_z}, valid at the following UTC times: {sounding_times_str}. -Your goal is to make a preliminary forecast assessment focusing on severe weather potential for {location_id} around {target_forecast_time_utc}. +The soundings provided are from the {model_name} model, run on {run_date_full_z}, valid at the +following UTC times: {sounding_times_str}. +Your goal is to make a preliminary forecast assessment focusing on severe weather potential for +{location_id} around {target_forecast_time_utc}. Sounding Data: {soundings_json_blob} -Remember to include your reasoning, any TOOL_CALL: {{"tool_name": "tool_name_here", "arguments": {{"param1": "value1", ...}}}} lines, and a final FORECAST_SUMMARY: statement.""" +Remember to include your reasoning, any TOOL_CALL: {{"tool_name": "tool_name_here", +"arguments": {{"param1": "value1", ...}}}} lines, and a final FORECAST_SUMMARY: statement.""" -JUDGE_SYSTEM_PROMPT = """You are an expert meteorologist acting as a judge. You will evaluate an AI assistant's analysis of model sounding data. +JUDGE_SYSTEM_PROMPT = """You are an expert meteorologist acting as a judge. You will evaluate +an AI assistant's analysis of model sounding data. The AI was asked to provide reasoning, call tools if necessary, and give a forecast summary. You will be given the AI's output and relevant Area Forecast Discussions (AFDs) from human forecasters for context. @@ -126,9 +140,11 @@ Your evaluation should focus on: * Were critical tool calls missed? 3. **Forecast Summary Quality (0-2 points):** * Clarity and conciseness. - * Alignment with the AI's own reasoning and the provided AFDs (or sensible deviation if model data strongly suggested it). + * Alignment with the AI's own reasoning and the provided AFDs (or sensible deviation if model + data strongly suggested it). -Provide a numerical score for each category and a total score (max 10.0). Also, provide a brief overall justification for your scores. +Provide a numerical score for each category and a total score (max 10.0). Also, provide a brief +overall justification for your scores. Your output MUST be in the following exact format: REASONING_SCORE: {{{{0-5 score}}}} TOOL_CALL_SCORE: {{{{0-3 score}}}} @@ -146,7 +162,8 @@ Contextual Area Forecast Discussions (AFDs): {afds_blob} --- -Please evaluate the AI assistant's output based on the criteria and provide your scores and justification in the specified format.""" +Please evaluate the AI assistant's output based on the criteria and provide your scores and +justification in the specified format.""" @dataclass @@ -196,9 +213,13 @@ class MeteorologyForecastRLEnv(BaseEnv): @classmethod def config_init(cls) -> Tuple[MetRLConfig, List[APIServerConfig]]: env_config = MetRLConfig() - agent_model_name = os.environ.get("AGENT_LLM_MODEL_NAME", env_config.tokenizer_name) + agent_model_name = os.environ.get( + "AGENT_LLM_MODEL_NAME", env_config.tokenizer_name + ) agent_api_key = os.environ.get("AGENT_LLM_API_KEY", "EMPTY_KEY_IF_LOCAL_VLLM") - agent_base_url = os.environ.get("AGENT_LLM_BASE_URL", "http://localhost:8080/v1") + agent_base_url = os.environ.get( + "AGENT_LLM_BASE_URL", "http://localhost:8080/v1" + ) judge_api_key = os.environ.get(env_config.judge_api_key_env_var) if not judge_api_key: logging.warning( @@ -206,7 +227,9 @@ class MeteorologyForecastRLEnv(BaseEnv): ) server_configs = [ APIServerConfig( - model_name=agent_model_name, base_url=agent_base_url, api_key=agent_api_key + model_name=agent_model_name, + base_url=agent_base_url, + api_key=agent_api_key, ), APIServerConfig( model_name=env_config.judge_model_name, @@ -247,7 +270,9 @@ class MeteorologyForecastRLEnv(BaseEnv): soundings.append(data) sounding_times.append(f"{line_hour:02d}00Z") found_hours.add(line_hour) - if len(found_hours) == len(self.config.forecast_hours_to_sample): + if len(found_hours) == len( + self.config.forecast_hours_to_sample + ): break if len(found_hours) == len(self.config.forecast_hours_to_sample): break @@ -258,12 +283,19 @@ class MeteorologyForecastRLEnv(BaseEnv): sounding_times = [p[0] for p in pairs] soundings = [p[1] for p in pairs] afd_texts = [] - for afd_path in sorted(loc.glob("AFD_*.txt"))[: self.config.max_afds_for_judge]: + for afd_path in sorted(loc.glob("AFD_*.txt"))[ + : self.config.max_afds_for_judge + ]: with open(afd_path, encoding="utf-8", errors="replace") as f: - afd_texts.append("".join(c for c in f.read() if c.isprintable() or c.isspace())) + afd_texts.append( + "".join(c for c in f.read() if c.isprintable() or c.isspace()) + ) latest_hour = int(sounding_times[-1][:2]) target_hour = latest_hour + self.config.target_forecast_hour_offset - target_time = f"{target_hour:02d}00Z on {self.config.target_date[4:6]}/{self.config.target_date[6:8]}/{self.config.target_date[0:4]}" + target_time = ( + f"{target_hour:02d}00Z on {self.config.target_date[4:6]}/" + f"{self.config.target_date[6:8]}/{self.config.target_date[0:4]}" + ) run_time = soundings[0].get("tm", "00/00Z").split("/")[1][:2] + "Z" run_date_full_z = f"{self.config.target_date} at {run_time}" case = CaseData( @@ -293,7 +325,9 @@ class MeteorologyForecastRLEnv(BaseEnv): @staticmethod def _parse_llm_output(text: str) -> Dict[str, Any]: - think_match = re.search(r"(.*?)", text, re.DOTALL | re.IGNORECASE) + think_match = re.search( + r"(.*?)", text, re.DOTALL | re.IGNORECASE + ) think_content = think_match.group(1).strip() if think_match else "" forecast_summary = "" tool_calls: List[Dict[str, Any]] = [] @@ -315,7 +349,12 @@ class MeteorologyForecastRLEnv(BaseEnv): @staticmethod def _parse_judge_output(text: str) -> Tuple[float, Dict[str, float], str]: - scores = {"reasoning": 0.0, "tool_call": 0.0, "forecast_summary": 0.0, "total": 0.0} + scores = { + "reasoning": 0.0, + "tool_call": 0.0, + "forecast_summary": 0.0, + "total": 0.0, + } for key in scores: match = re.search(rf"{key.upper()}_SCORE:\s*([0-9.]+)", text) if match: @@ -373,7 +412,12 @@ class MeteorologyForecastRLEnv(BaseEnv): ] outputs = await self._call_agent(agent_messages) - group: ScoredDataGroup = {"tokens": [], "masks": [], "scores": [], "overrides": []} + group: ScoredDataGroup = { + "tokens": [], + "masks": [], + "scores": [], + "overrides": [], + } for llm_output in outputs: parsed = self._parse_llm_output(llm_output) @@ -381,7 +425,9 @@ class MeteorologyForecastRLEnv(BaseEnv): judge_prompt = JUDGE_USER_PROMPT_TEMPLATE.format( llm_full_output=llm_output, afds_blob=( - "\n\n---\n\n".join(item.afd_texts) if item.afd_texts else "No AFDs provided." + "\n\n---\n\n".join(item.afd_texts) + if item.afd_texts + else "No AFDs provided." ), ) judge_messages = [ @@ -389,7 +435,9 @@ class MeteorologyForecastRLEnv(BaseEnv): {"role": "user", "content": judge_prompt}, ] judge_out = await self._call_judge(judge_messages) - final_score, judge_scores, justification = self._parse_judge_output(judge_out) + final_score, judge_scores, justification = self._parse_judge_output( + judge_out + ) self.judge_scores_buffer.append(final_score) tokenized = tokenize_for_trainer( @@ -450,7 +498,9 @@ class MeteorologyForecastRLEnv(BaseEnv): llm_output = await self._call_agent(agent_messages) judge_prompt = JUDGE_USER_PROMPT_TEMPLATE.format( llm_full_output=llm_output, - afds_blob="\n\n---\n\n".join(case.afd_texts) if case.afd_texts else "No AFDs.", + afds_blob=( + "\n\n---\n\n".join(case.afd_texts) if case.afd_texts else "No AFDs." + ), ) judge_messages = [ {"role": "system", "content": JUDGE_SYSTEM_PROMPT}, @@ -464,9 +514,9 @@ class MeteorologyForecastRLEnv(BaseEnv): if metrics is None: metrics = {} if self.judge_scores_buffer: - metrics["train/avg_judge_total_score"] = sum(self.judge_scores_buffer) / len( + metrics["train/avg_judge_total_score"] = sum( self.judge_scores_buffer - ) + ) / len(self.judge_scores_buffer) self.judge_scores_buffer.clear() if self.eval_scores_buffer: avg_total = sum(x["total"] for x in self.eval_scores_buffer) / len( @@ -494,4 +544,4 @@ class MeteorologyForecastRLEnv(BaseEnv): if __name__ == "__main__": - MeteorologyForecastRLEnv.cli() \ No newline at end of file + MeteorologyForecastRLEnv.cli() diff --git a/environments/hack0/requirements.txt b/environments/community/meteorology_forecast/requirements.txt similarity index 94% rename from environments/hack0/requirements.txt rename to environments/community/meteorology_forecast/requirements.txt index 8e9e9c05..f79fbdf6 100644 --- a/environments/hack0/requirements.txt +++ b/environments/community/meteorology_forecast/requirements.txt @@ -3,4 +3,4 @@ pydantic httpx # atroposlib (Optional, if you have it available and want full integration) # If atroposlib is a local package, it would not be listed here -# or would be installed via `pip install -e .` if it's a setuptools project. \ No newline at end of file +# or would be installed via `pip install -e .` if it's a setuptools project. diff --git a/environments/hack0/sharppy_sounding.png b/environments/community/meteorology_forecast/sharppy_sounding.png similarity index 100% rename from environments/hack0/sharppy_sounding.png rename to environments/community/meteorology_forecast/sharppy_sounding.png