diff --git a/example_trainer/vllm_api_server.py b/example_trainer/vllm_api_server.py index 09c84514..4a126ed2 100644 --- a/example_trainer/vllm_api_server.py +++ b/example_trainer/vllm_api_server.py @@ -711,80 +711,154 @@ async def bridge_disable() -> JSONResponse: return JSONResponse({"status": "ok"}) +def _worker_export_cuda_ipc_handles(self) -> dict: + """ + Worker-side function to export CUDA IPC handles. + + This function runs INSIDE the vLLM worker process where the model lives. + Called via collective_rpc - 'self' is the GPU worker instance. + + Returns: + Dictionary with IPC handles for all model parameters. + """ + import base64 + import pickle + + model = self.model_runner.model + + ipc_handles = {} + failed_params = [] + + for name, param in model.named_parameters(): + try: + if not param.is_cuda: + failed_params.append(f"{name}: not on CUDA") + continue + + storage = param.data.storage() + handle = storage._share_cuda_() + + handle_bytes = pickle.dumps(handle) + handle_b64 = base64.b64encode(handle_bytes).decode('ascii') + + ipc_handles[name] = { + "ipc_handle": handle_b64, + "shape": list(param.shape), + "dtype": str(param.dtype), + "device_index": param.device.index if param.device.index is not None else 0, + "storage_offset": param.storage_offset(), + "numel": param.numel(), + "stride": list(param.stride()), + } + except Exception as e: + failed_params.append(f"{name}: {str(e)}") + + return { + "handles": ipc_handles, + "failed": failed_params, + "model_class": model.__class__.__name__, + } + + @app.post("/bridge/export_cuda_ipc") async def bridge_export_cuda_ipc() -> JSONResponse: """ Export CUDA IPC handles for all model parameters. - This enables TRUE shared memory between vLLM and the trainer. - The trainer can reconstruct tensors from these handles without - allocating new GPU memory - they share the exact same memory! + Uses collective_rpc to execute on the worker process where the model lives. + This is the vLLM v1 way to access the model implicitly. IMPORTANT: Only works when both processes are on the SAME GPU. Returns: JSON with IPC handles, shapes, dtypes for each parameter. """ - import base64 - import pickle - assert engine is not None try: - # Access the underlying model - model = engine.engine.model_executor.driver_worker.model_runner.model + rpc_result = None + method_used = None - ipc_handles = {} - for name, param in model.named_parameters(): + # Try to find collective_rpc in AsyncLLM's internals + # vLLM v1 exposes this on various paths depending on version + + paths_to_try = [ + ("engine", lambda: engine.collective_rpc(_worker_export_cuda_ipc_handles)), + ("engine.llm_engine", lambda: engine.llm_engine.collective_rpc(_worker_export_cuda_ipc_handles)), + ("engine.engine_core", lambda: engine.engine_core.collective_rpc(_worker_export_cuda_ipc_handles)), + ("engine.llm_engine.model_executor", lambda: engine.llm_engine.model_executor.collective_rpc(_worker_export_cuda_ipc_handles)), + ] + + for name, rpc_fn in paths_to_try: try: - # Get the underlying storage - storage = param.data.storage() - - # Get CUDA IPC handle - this is the key to shared memory! - # The handle can be sent to another process on the same GPU - # to reconstruct a tensor pointing to the SAME memory - handle = storage._share_cuda_() - - # Encode handle for JSON transmission - handle_bytes = pickle.dumps(handle) - handle_b64 = base64.b64encode(handle_bytes).decode('ascii') - - ipc_handles[name] = { - "ipc_handle": handle_b64, - "shape": list(param.shape), - "dtype": str(param.dtype), - "device_index": param.device.index, - "storage_offset": param.storage_offset(), - "numel": param.numel(), - "stride": list(param.stride()), - } + logger.info(f"Trying {name}.collective_rpc...") + rpc_result = rpc_fn() + method_used = name + logger.info(f"SUCCESS via {name}") + break + except AttributeError as e: + logger.debug(f"{name} failed: {e}") + continue except Exception as e: - logger.warning(f"Could not export IPC handle for {name}: {e}") + logger.warning(f"{name} error: {e}") continue - # Save to file for trainer to read + if rpc_result is None: + # Log engine structure for debugging + logger.error("collective_rpc not found. Engine structure:") + logger.error(f" type: {type(engine).__name__}") + logger.error(f" attrs: {[a for a in dir(engine) if not a.startswith('_')][:20]}") + + raise HTTPException( + status_code=500, + detail=( + "collective_rpc not available on AsyncLLM in this vLLM version. " + "Options: 1) Use vllm_sync_server.py for CUDA IPC, " + "2) Use multi-GPU mode (separate GPUs for trainer/vLLM), " + "3) Use legacy or LoRA mode." + ) + ) + + # Process result (collective_rpc returns a list, one per worker) + result = rpc_result[0] if isinstance(rpc_result, list) else rpc_result + ipc_handles = result.get("handles", {}) + failed_params = result.get("failed", []) + + if failed_params: + logger.warning(f"Could not export {len(failed_params)} parameters") + + if len(ipc_handles) == 0: + raise HTTPException(status_code=500, detail="No IPC handles exported") + + # Save to file log_dir = os.environ.get("LOGDIR", ".") ipc_path = Path(log_dir) / "cuda_ipc_handles.json" with open(ipc_path, "w") as f: json.dump({ "handles": ipc_handles, - "model": getattr(engine, "model_config", {}).get("model", "unknown"), + "model_class": result.get("model_class", "unknown"), "device_count": torch.cuda.device_count(), "export_time": time.time(), + "method": method_used, }, f, indent=2) - logger.info(f"Exported {len(ipc_handles)} CUDA IPC handles to {ipc_path}") + logger.info(f"Exported {len(ipc_handles)} CUDA IPC handles via {method_used}") return JSONResponse({ "status": "ok", "num_parameters": len(ipc_handles), "ipc_path": str(ipc_path), "total_params": sum(info["numel"] for info in ipc_handles.values()), + "method": method_used, }) + except HTTPException: + raise except Exception as e: logger.error(f"Failed to export CUDA IPC handles: {e}") + import traceback + logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e))