相关部分关键代码如下:
DDP 初始化(src/utils/distributed.py)
def setup_distributed(requested_device: str = "auto") -> DistributedContext:
if not is_distributed_env():
device = _resolve_single_process_device(requested_device)
return DistributedContext(
distributed=False, rank=0, local_rank=0, world_size=1, device=device,
)
rank = int(os.environ["RANK"])
local_rank = int(os.environ["LOCAL_RANK"])
world_size = int(os.environ["WORLD_SIZE"])
if torch.cuda.is_available() and requested_device != "cpu":
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)
backend = "nccl"
else:
device = torch.device("cpu")
backend = "gloo"
if not dist.is_initialized():
dist.init_process_group(backend=backend)
return DistributedContext(
distributed=True, rank=rank, local_rank=local_rank,
world_size=world_size, device=device,
)
训练入口 DDP 包装(scripts/train_stage2.py)
# Barrier: all ranks finish model loading before DDP wrapping
if distributed_context.is_distributed:
dist.barrier()
if distributed_context.is_distributed:
from torch.nn.parallel import DistributedDataParallel
generator = DistributedDataParallel(
generator,
device_ids=[distributed_context.local_rank] if device.type == "cuda" else None,
find_unused_parameters=False,
broadcast_buffers=False,
)
discriminator = DistributedDataParallel(
discriminator,
device_ids=[distributed_context.local_rank] if device.type == "cuda" else None,
find_unused_parameters=False,
broadcast_buffers=False,
)
# Barrier: DDP construction done
if distributed_context.is_distributed:
dist.barrier()
训练循环中的 collective 操作(src/training/trainer_stage2.py)
# 梯度累积步完成后
if (step + 1) % gradient_accumulation_steps == 0:
self.scaler.step(self.gen_optimizer) # DDP backward 触发 allreduce
self.optimizer.zero_grad()
global_step += 1
# 评估(内含 barrier + all_reduce)
if global_step % eval_steps == 0:
eval_metrics = self._evaluate() # barrier → generate → all_reduce
def _evaluate(self) -> dict:
if self.distributed_context.is_distributed:
torch.distributed.barrier() # 所有 rank 同步进入
# ... 遍历验证集生成 + 打分 ...
totals = distributed_sum(totals, self.distributed_context) # all_reduce
崩溃日志摘要
模型加载后 4 个 rank 卡在 barrier,其余 rank 无法到达 → 训练不启动
Discriminator loaded from checkpoint: checkpoints/discriminator/best/best.pt # ×4 条
[MXKW][E] ioctl create queue block timeout, gpu_id:51332 type:21. Retrying. # 持续重复
DDP collective 不匹配崩溃
[rank6]: RuntimeError: Detected mismatch between collectives on ranks.
Rank 6 is running collective: ALLREDUCE (backward pass)
Rank 0 is running collective: BARRIER (_validate)
启动命令
CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 torchrun --standalone --nproc_per_node=8 \
scripts/train_stage2.py \
--config configs/training/stage2.yaml \
--generator-config configs/model/generator.yaml \
--discriminator-config configs/model/discriminator.yaml \
--device cuda