The most surprising thing about PyTorch distributed training is that it often makes your single-GPU training slower on a per-GPU basis, even though it lets you train much larger models and datasets.

Let’s see it in action. Imagine you have a simple nn.Linear layer and want to train it across two GPUs.

import torch
import torch.nn as nn
import torch.distributed as dist
import os

# --- Configuration ---
WORLD_SIZE = 2
RANK = int(os.environ['RANK'])
MASTER_ADDR = 'localhost'
MASTER_PORT = '29500'

# --- Initialization ---
dist.init_process_group("nccl", rank=RANK, world_size=WORLD_SIZE,
                        init_method=f"tcp://{MASTER_ADDR}:{MASTER_PORT}")

# --- Model and Data ---
device = torch.device(f"cuda:{RANK}")
model = nn.Linear(1000, 1000).to(device)

# Wrap the model with DDP
# The 'bucket_cap_mb' limits the size of gradient buckets.
# This can affect performance by balancing communication and computation.
ddp_model = nn.parallel.DistributedDataParallel(model, device_ids=[RANK],
                                              bucket_cap_mb=25)

# Dummy data
input_data = torch.randn(32, 1000).to(device)
target = torch.randn(32, 1000).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.SGD(ddp_model.parameters(), lr=0.01)

# --- Training Step ---
optimizer.zero_grad()
outputs = ddp_model(input_data)
loss = criterion(outputs, target)
loss.backward()
optimizer.step()

print(f"Rank {RANK} loss: {loss.item()}")

# --- Cleanup ---
dist.destroy_process_group()

To run this, you’d typically launch it with torchrun (formerly torch.distributed.launch):

torchrun --nproc_per_node=2 your_script_name.py

This sets up two processes, each on a different GPU, and establishes communication channels between them. The DistributedDataParallel (DDP) wrapper is key. It replicates your model on each GPU and handles gradient synchronization. When loss.backward() is called, gradients are computed locally. DDP then automatically triggers an all-reduce operation to average these gradients across all GPUs. Finally, optimizer.step() updates the model parameters on each GPU with the averaged gradients.

The problem DDP solves is scaling. Training massive models like large language models or complex vision transformers on a single GPU is often infeasible due to memory limitations or simply taking too long. Distributed training allows you to:

  1. Use more GPUs: Distribute the model or data across multiple GPUs, enabling larger models or faster training.
  2. Larger Batch Sizes: By pooling gradients from multiple GPUs, you can effectively use a much larger batch size, which can sometimes improve convergence speed and model accuracy.
  3. Data Parallelism (most common): Each GPU gets a copy of the model and a different slice of the data. Gradients are averaged. This is what the example above demonstrates.
  4. Model Parallelism: Different parts of the model reside on different GPUs. This is for models too large to fit on a single GPU.
  5. Pipeline Parallelism: A form of model parallelism where layers are staged across GPUs, forming a pipeline.

The core mechanism is gradient synchronization. DDP uses a collective communication operation, typically all_reduce, to average gradients. Each process computes its local gradients, then all_reduce ensures every process ends up with the sum (or average) of all gradients. This averaged gradient is then used by the optimizer on each GPU.

The bucket_cap_mb parameter in DistributedDataParallel controls the maximum size of gradient "buckets." Gradients are grouped into these buckets. When a bucket is full, DDP initiates an all_reduce operation for that bucket. This is a crucial performance tuning knob. Smaller buckets mean more frequent, smaller communication calls. Larger buckets mean fewer, larger calls. The optimal size balances the overhead of launching communication operations against the time spent waiting for communication to complete. If bucket_cap_mb is too small, you might experience excessive communication overhead. If it’s too large, you might have GPUs waiting longer for gradients from others, leading to underutilization.

The init_method parameter is vital for establishing the communication group. tcp://localhost:29500 is common for single-node, multi-GPU setups. For multi-node, you’d specify the IP address of the master node and a free port. nccl is the recommended backend for NVIDIA GPUs, offering high performance for collective operations.

When you launch your distributed script, each process needs to know its RANK (its unique ID within the group, from 0 to WORLD_SIZE - 1) and the total WORLD_SIZE. These are typically set via environment variables, which torchrun handles automatically.

You’ll find that many distributed training setups require careful management of these ranks and world sizes, especially when scaling beyond a single machine. Issues with network connectivity, firewall rules, or incorrect IP/port configurations are common.

The next hurdle you’ll likely face is optimizing communication. As your model and batch size grow, the gradient synchronization step (loss.backward() implicitly triggers it) becomes a bottleneck, and you’ll need to explore techniques like gradient accumulation or more advanced parallelism strategies.

Want structured learning?

Take the full Pytorch course →