IPC updates

This commit is contained in:
Jai Suphavadeeprasit 2025-12-10 16:59:51 -05:00
parent e278978fa1
commit b0d35be8a4
3 changed files with 247 additions and 15 deletions

View file

@ -314,6 +314,9 @@ class BridgeConfig:
# vLLM server URL for HTTP-based sync (local mode)
vllm_api_url: str = "http://localhost:9001"
# CUDA IPC mode: share GPU memory directly with vLLM (same GPU only!)
use_cuda_ipc: bool = False
# Derived from environment
num_gpus_per_node: int = field(default_factory=lambda: torch.cuda.device_count())
@ -338,6 +341,7 @@ class BridgeConfig:
device=config.device,
log_dir=os.environ.get("LOGDIR"),
vllm_api_url=f"http://localhost:{getattr(config, 'vllm_port', 9001)}",
use_cuda_ipc=getattr(config, 'use_cuda_ipc', False),
)
@ -419,9 +423,12 @@ class VLLMWeightBridge:
In local mode:
- No NCCL process groups (trainer and vLLM are separate processes)
- Communication via HTTP to vLLM's bridge endpoints
- Trainer loads its own model copy, updates are synced via checkpoints
- Trainer loads its own model copy, OR uses CUDA IPC for true shared memory
"""
print("[Bridge] Using LOCAL MODE (HTTP-based sync, no NCCL)")
if self.config.use_cuda_ipc:
print("[Bridge] Using CUDA IPC MODE (true shared GPU memory)")
else:
print("[Bridge] Using LOCAL MODE (HTTP-based sync, no NCCL)")
print(f"[Bridge] vLLM API URL: {self.config.vllm_api_url}")
# Verify vLLM server is reachable
@ -436,12 +443,104 @@ class VLLMWeightBridge:
print(f"[Bridge] Warning: Could not reach vLLM server: {e}")
print("[Bridge] Training will continue, but vLLM sync may not work")
# For CUDA IPC mode, request vLLM to export IPC handles
if self.config.use_cuda_ipc:
self._request_cuda_ipc_export()
self._load_cuda_ipc_handles()
# Load parameter mappings if available (optional in local mode)
try:
self._load_param_mappings()
except RuntimeError:
print("[Bridge] Parameter mapping file not found (optional in local mode)")
self.param_mappings = {}
def _request_cuda_ipc_export(self) -> None:
"""Request vLLM to export CUDA IPC handles."""
import requests
print("[Bridge] Requesting CUDA IPC handles from vLLM...")
try:
response = requests.post(
f"{self.config.vllm_api_url}/bridge/export_cuda_ipc",
timeout=60
)
if response.status_code == 200:
result = response.json()
print(f"[Bridge] vLLM exported {result.get('num_parameters', 0)} IPC handles")
else:
raise RuntimeError(f"Failed to export IPC handles: {response.status_code}")
except Exception as e:
raise RuntimeError(f"Could not request CUDA IPC export: {e}")
def _load_cuda_ipc_handles(self) -> None:
"""
Load CUDA IPC handles from file and reconstruct shared tensors.
This is the key to TRUE shared memory - the tensors we create here
point to the SAME GPU memory that vLLM is using!
"""
import base64
import pickle
log_dir = self.config.log_dir or os.environ.get("LOGDIR", ".")
ipc_path = Path(log_dir) / "cuda_ipc_handles.json"
# Wait for file to be created
wait_time = 0
while not ipc_path.exists() and wait_time < self.config.timeout_seconds:
print(f"[Bridge] Waiting for {ipc_path}...")
time.sleep(1)
wait_time += 1
if not ipc_path.exists():
raise RuntimeError(f"CUDA IPC handles file not found: {ipc_path}")
with open(ipc_path, "r") as f:
data = json.load(f)
handles_data = data.get("handles", {})
print(f"[Bridge] Reconstructing {len(handles_data)} shared tensors from IPC handles...")
reconstructed = 0
for name, info in handles_data.items():
try:
# Decode the IPC handle
handle_bytes = base64.b64decode(info["ipc_handle"])
handle = pickle.loads(handle_bytes)
# Reconstruct the storage from the IPC handle
# This does NOT allocate new memory - it maps to existing memory!
device = torch.device(f"cuda:{info['device_index']}")
# Get dtype
dtype_str = info["dtype"]
dtype = getattr(torch, dtype_str.replace("torch.", ""))
# Reconstruct tensor from IPC handle
# The storage is shared with vLLM's process
storage = torch.cuda.Storage._new_shared_cuda(*handle)
# Create tensor view of the shared storage
tensor = torch.tensor([], dtype=dtype, device=device)
tensor.set_(
storage,
info["storage_offset"],
info["shape"],
info["stride"]
)
# Store in shared_state_dict
self.shared_state_dict[name] = tensor
reconstructed += 1
except Exception as e:
print(f"[Bridge] Warning: Could not reconstruct {name}: {e}")
continue
print(f"[Bridge] Successfully reconstructed {reconstructed} shared tensors")
print(f"[Bridge] Memory savings: ~{reconstructed * 4 / 1024:.1f} GB (no model copy needed!)")
def _initialize_distributed_mode(self) -> None:
"""
@ -561,6 +660,9 @@ class VLLMWeightBridge:
"""
Get a model whose parameters point to vLLM's shared tensors.
In CUDA IPC mode: shared_state_dict is populated from IPC handles during init.
In other modes: must call attach_to_vllm_weights() first.
This creates a HuggingFace model structure but replaces all parameters
with references to the shared tensors. When the optimizer updates these
parameters, it modifies vLLM's weights directly.
@ -572,26 +674,42 @@ class VLLMWeightBridge:
return self._model
if not self.shared_state_dict:
raise RuntimeError(
"Must call attach_to_vllm_weights() before get_trainable_model()"
)
if self.config.use_cuda_ipc:
raise RuntimeError(
"CUDA IPC mode enabled but no shared tensors found. "
"Check that vLLM exported IPC handles correctly."
)
else:
raise RuntimeError(
"Must call attach_to_vllm_weights() before get_trainable_model()"
)
print(f"[Bridge] Creating trainable model for {self.config.model_name}")
if self.config.use_cuda_ipc:
print("[Bridge] Using CUDA IPC shared tensors (NO NEW GPU MEMORY!)")
# Load model config (not weights)
model_config = AutoConfig.from_pretrained(self.config.model_name)
# Create model with empty weights
# Create model with empty weights (meta device = no memory)
with torch.device("meta"):
model = AutoModelForCausalLM.from_config(model_config)
# Replace each parameter with the shared tensor
self._replace_parameters_with_shared(model)
# Move model structure to device (parameters already on device via IPC)
model.to(self.device)
self._model = model
print(f"[Bridge] Trainable model ready with {sum(p.numel() for p in model.parameters())} parameters")
total_params = sum(p.numel() for p in model.parameters())
print(f"[Bridge] Trainable model ready with {total_params:,} parameters")
if self.config.use_cuda_ipc:
# Verify memory savings
param_memory_gb = total_params * 2 / 1e9 # bfloat16 = 2 bytes
print(f"[Bridge] CUDA IPC memory savings: ~{param_memory_gb:.1f} GB (shared with vLLM)")
return model
def _replace_parameters_with_shared(self, model: nn.Module) -> None: