3. DLRover用户指南

3.1. 主要组件的配置

3.1.1. Helm Chart的配置

DLRover的controller通过Helm的方式部署,可选参数说明参见下表。

表 3.1 Helm Chart安装的可选参数说明

参数

说明

registry

若集群使用本地镜像仓库,可设置此参数为 domain/project

topoMapper.topology_config

用于网络拓扑感知的配置文件,通常需要手动修改

controller.image

controller组件镜像名称,默认为 elasticjob-controller

dlroverMaster.image

dlrover-master组件镜像名称,默认为 dlrover-master

controller.image.tag

指定controller组件的image tag

dlroverMaster.image.tag

指定dlrover master的image tag

3.1.2. 提交训练任务的配置

训练任务YAML配置文件中的主要参数说明,参见下表。

表 3.2 YAML文件主要参数说明

参数

说明

metadata.name

任务名称

metadata.namespace

任务所在的命名空间,通常为 dlrover

spec.distributionStrategy

任务的分布式策略,PyTorch任务为 AllreduceStrategy

spec.optimizeMode

任务模式,通常为 single-job,无需修改

spec.replicaSpecs.worker.replicas

子任务副本数

spec.replicaSpecs.worker.restartCount

子任务最大的重启次数,默认为 3

spec.replicaSpecs.worker.template.spec.containers.image

任务镜像名称

spec.replicaSpecs.worker.template.spec.containers.imagePullPolicy

任务镜像的拉取策略,可选为 AlwaysNeverIfNotPresent

spec.replicaSpecs.worker.template.spec.containers.command

任务启动所执行的命令

spec.replicaSpecs.worker.template.spec.containers.resources

子任务所需的资源,如GPU资源 metax-tech.com/gpu

spec.replicaSpecs.dlrover-master.template.spec.containers.image

指定dlrover-master容器的镜像名,缺省可以不指定dlrover-master的配置

spec.replicaSpecs.dlrover-master.template.spec.containers.command

指定dlrover-master容器的执行命令,缺省可以不指定dlrover-master的配置

3.1.3. dlrover-run的配置

dlrover-run 已集成在DLRover的Python wheel包中,安装训练镜像后即可直接使用。主要参数说明参见下表。

表 3.3 dlrover-run命令的主要参数说明

参数

说明

--max-restarts

训练进程的最大重启次数,达到最大值后将判定为子任务失败,通常配置为3次

--network-check

训练启动或重启之前,将进行网络检测。检测失败的节点将被标记为故障,不参与训练任务

--exclude-straggler

掉队检测,需要开启 --network-check。若网络检测运行时间长于其他节点的均值的两倍,将判定为掉队,不参与训练任务

--switchbox-check

网络检测的流程中,增加对于主机环境的switchbox 到GPU之间的光互连检查,默认为 false(关闭)

--box-pairs

指定switchbox中的socket配对,格式为 pairA_1:pairA_2 pairB_1:pairB_2, 默认为 0:1 2:4 3:5 6:7

--min-bandwidth

指定switchbox 到 GPU的最低门限带宽,低于这个值被认定为故障连接,单位为MB/s,默认为 10000MB/s

--min-channels

指定switchbox 到 GPU之间的通道对正常工作的最小通道数,低于这个值被认定为节点故障,默认为 2,表示有2组switch pair正常就能通过switchbox检查

--standalone

单独在容器运行时需要指定该参数

备注

dlrover-run 的其他参数与 torchrun 相同,且能自动处理 master_addrmaster-port 参数,用户无需单独指定。

3.1.4. Flash Checkpoint的Python API应用

本组API接口主要用于训练进程,在保存和读取Checkpoint时,可替换训练框架中的对应代码。

3.1.4.1. DDP框架Flash Checkpoint应用示例

from dlrover.trainer.torch.flash_checkpoint.ddp import (
    DdpCheckpointer,
    StorageType,)

checkpointer = DdpCheckpointer(checkpoint_dir)

state_dict = {
    "model": model.state_dict(),
    "optimizer": optimizer.state_dict(),
    "step": step,}# 存储系统的 path
ckpt_path = os.path.join(checkpoint_dir, f"checkpoint-{iter_num}.pt")
# 将 checkpoint 秒级存入到内存中,可以很高频地写。如果训练进程失败,会自动
# 将内存中最近的 checkpoint 写入存储。
if iter_num % save_memory_interval == 0:
    checkpointer.save_checkpoint(
        step, state_dict, ckpt_path, storage_type=StorageType.MEMORY
)
# 将 checkpoint 异步存入到存储中,可以低频导出,也可以高频导出,但是高频导出会
# 占据很多存储空间,用户需要自行清理老的Checkpoint。
if iter_num % save_storage_interval == 0:
    checkpointer.save_checkpoint(
        step, state_dict, ckpt_path, storage_type=StorageType.DISK
    )
ckpt_dict = checkpointer.load_checkpoint()
model.load_state_dict(ckpt_dict["model"])
optimizer.load_state_dict(ckpt_dict["optimizer"]

3.1.4.2. FSDP框架Flash Checkpoint应用示例

保存Checkpoint的API使用示例如下:

from dlrover.trainer.torch.flash_checkpoint.fsdp import (
    FsdpShardCheckpointer,
    StorageType,
)

checkpointer = FsdpShardCheckpointer(checkpoint_dir)

with FSDP.state_dict_type(model, StateDictType.SHARDED_STATE_DICT):
    state_dict = {
        "model": model.state_dict(),
        "optim": FSDP.optim_state_dict(model, optimizer),
        "step": step,
    }
# 存储系统的 directory
    ckpt_dir = os.path.join(checkpoint_dir, str(step))
# 将 checkpoint 秒级存入到内存中,可以很高频地写。如果训练进程失败,会自动
# 将内存中最近的 checkpoint 写入存储。
    if step % save_memory_interval == 0:
        checkpointer.save_checkpoint(
            step, state_dict, ckpt_dir, storage_type=StorageType.MEMORY
        )
# 将 checkpoint 异步存入到存储中,可以低频导出,也可以高频导出,但是高频导出会
# 占据很多存储空间,用户需要自行清理老的Checkpoint。
    if step % save_storage_interval == 0:
        checkpointer.save_checkpoint(
            step, state_dict, ckpt_dir, storage_type=StorageType.DISK
        )

读取Checkpoint的API使用示例如下,storage_reader 配置为 Flash Checkpoint 支持 FSDP 的 reader 即可。

checkpointer = FsdpShardCheckpointer(checkpoint_dir)

with FSDP.state_dict_type(model, StateDictType.SHARDED_STATE_DICT):
    state_dict = {
        "model": model.state_dict(),
        "step": 0,
    }
    storage_reader = checkpointer.get_storage_reader()
    if not storage_reader:
        return
    dist_cp.load_state_dict(
        state_dict=state_dict,
        storage_reader=storage_reader,
    )
    model.load_state_dict(state_dict["model"])

    optim_state = load_sharded_optimizer_state_dict(
        model_state_dict=state_dict["model"],
        optimizer_key="optim",
        storage_reader=storage_reader,
    )

    flattened_osd = FSDP.optim_state_dict_to_load(
        model, optimizer, optim_state["optim"]
    )
    optimizer.load_state_dict(flattened_osd)

3.1.4.3. Megatron-LM框架Flash Checkpoint应用示例

存取Checkpoint的API和Megatron原生类似,使用示例如下:

# from megatron.checkpointing import load_checkpoint
# from megatron.checkpointing import save_checkpoint
from dlrover.trainer.torch.flash_checkpoint.megatron_dist_ckpt import save_checkpoint
from dlrover.trainer.torch.flash_checkpoint.megatron_dist_ckpt import load_checkpoint
from dlrover.trainer.torch.flash_checkpoint.megatron import StorageType

可以选择将Checkpoint保存到内存(StorageType.MEMORY)或者磁盘(StorageType.DISK)。通过StorageType.MEMORY将Checkpoint保存至内存的示例如下:

if args.save and iteration % save_memory_interval == 0:
    save_checkpoint(iteration, model, optimizer, num_floating_point_operations_so_far=0,
                    opt_param_scheduler, storage_type=StorageType.MEMORY,)

args.iteration, _ = load_checkpoint(model, optimizer, opt_param_scheduler)

支持设置备份组的大小,在调用 save_checkpointload_checkpoint 操作时需指定。

if args.save and iteration % save_memory_interval == 0:
    save_checkpoint(iteration, model, optimizer, num_floating_point_operations_so_far=0,
                    opt_param_scheduler, storage_type=StorageType.MEMORY, replica_count=2)

args.iteration, _ = load_checkpoint(model, optimizer, opt_param_scheduler,replica_count=2)

当前支持的Megatron-LM框架版本 ≥ 0.4.0。

0.5.0版本之前,save_checkpointnum_floating_point_operations_so_far 参数,可传入 0load_checkpoint 返回 stat_dictnum_floating_point_operations_so_far,0.5.0版本之前可忽略 num_floating_point_operations_so_far

3.2. 基本功能点配置

3.2.1. Spark数据预处理

DLRover提供了基于Spark的分布式处理能力进行训练数据的预处理,借助强大的Spark弹性分布式内存计算能力,可以为常用的训练模型提供快速的数据预处理。 详细的Spark使用参见 Spark官方文档

以下以Spark在Megatron训练数据预处理中的应用为例,说明Spark的数据预处理能力。

Spark数据预处理关键代码位于 /path/to/dlrover_wheel_package/dlrover/python/preprocess/pyspark_preprocess.py,代码示例如下:

from pyspark.sql import SparkSession

import sys
import os
import lm_dataformat as lmd
import argparse
import numpy as np
import cloudpickle
import ftfy
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__) + '/megatron')))
print(sys.path)
import megatron
from megatron.tokenizer import build_tokenizer
from megatron.data import indexed_dataset

def get_args():
parser = argparse.ArgumentParser()
group = parser.add_argument_group(title="input data")
group.add_argument(
"--input",
type=str,
required=True,
help="Path to input jsonl files or lmd archive(s) - if using multiple archives, put them in a comma separated "
"list",
)
group.add_argument(
"--jsonl-keys",
nargs="+",
default=["text"],
help="space separate listed of keys to extract from jsonl. Defa",
)
group.add_argument(
"--num-docs",
default=None,
help="Optional: Number of documents in the input data (if known) for an accurate progress bar.",
type=int,
)
group = parser.add_argument_group(title="tokenizer")
group.add_argument(
"--tokenizer-type",
type=str,
required=True,
choices=[
"HFGPT2Tokenizer",
"HFTokenizer",
"GPT2BPETokenizer",
"CharLevelTokenizer",
"TiktokenTokenizer",
],
help="What type of tokenizer to use.",
)
group.add_argument(
"--vocab-file", type=str, default=None, help="Path to the vocab file"
)
group.add_argument(
"--merge-file",
type=str,
default=None,
help="Path to the BPE merge file (if necessary).",
)
group.add_argument(
"--append-eod",
action="store_true",
help="Append an <eod> token to the end of a document.",
)
group.add_argument("--ftfy", action="store_true", help="Use ftfy to clean text")
group = parser.add_argument_group(title="output data")
group.add_argument(
"--output-prefix",
type=str,
required=True,
help="Path to binary output file without suffix",
)
group.add_argument(
"--dataset-impl",
type=str,
default="mmap",
choices=["lazy", "cached", "mmap"],
help="Dataset implementation to use. Default: mmap",
)

group = parser.add_argument_group(title="runtime")
group.add_argument(
"--workers", type=int, default=1, help="Number of worker processes to launch"
)
group.add_argument(
"--log-interval",
type=int,
default=100,
help="Interval between progress updates",
)
args = parser.parse_args()
args.keep_empty = False
# some default/dummy values for the tokenizer
args.rank = 0
args.make_vocab_size_divisible_by = 128
args.model_parallel_size = 1

return args

class Encoder(object):
def __init__(self, args):
self.args = args
self.builders = {}
self.output_bin_files = {}
self.output_idx_files = {}
self.tokenizer = None

def initializer(self):
# Use Encoder class as a container for global data
self.tokenizer = build_tokenizer(self.args)
for key in self.args.jsonl_keys:
self.output_bin_files[key] = '{}_{}_{}.bin'.format(
self.args.output_prefix, key, 'document'
)
self.output_idx_files[key] = '{}_{}_{}.idx'.format(
self.args.output_prefix, key, 'document'
)
return self.tokenizer

def get_builder(self, key):
if key not in self.builders:
self.builders[key] = indexed_dataset.make_builder(
self.output_bin_files[key],
impl = self.args.dataset_impl,
vocab_size = self.tokenizer.vocab_size,
)
return self.builders[key]

def encode(self, text):
if self.args.ftfy:
text = ftfy.fix_text(text)
ids = {}
for key in self.args.jsonl_keys:
doc_ids = []
text_ids = self.tokenizer.tokenize(text)
if len(text_ids) > 0:
doc_ids.append(text_ids)
if self.args.append_eod:
doc_ids[-1].append(self.tokenizer.eod)
ids[key] = doc_ids
return ids, len(text)

def main():
spark = SparkSession.builder.appName('gpt-neox-process').getOrCreate()
args = get_args()
encoder = Encoder(args)
tokenizer = encoder.initializer()
broadcast_encoder = spark.sparkContext.broadcast(cloudpickle.dumps(encoder))

# 读取数据并缓存
lm_rdd = spark.sparkContext.parallelize(lmd.Reader(args.input).stream_data()) \
.repartition(200) # 调整为适当的分区数

# 使用 mapPartitions 替代 flatMap,减少数据传输
def process_partition(partition):
encoder = cloudpickle.loads(broadcast_encoder.value)
results = {key: [] for key in args.jsonl_keys}
for doc in partition:
encoded_doc, _ = encoder.encode(doc)
for key, value_list in encoded_doc.items():
results[key].extend(value_list)
return results.items()

def save_partition(partition):
encoder = cloudpickle.loads(broadcast_encoder.value)
encoder.initializer()

def add_item(key, value):
encoder.get_builder(key)
if isinstance(value, list):
for subitem in value:
if isinstance(subitem, list):
for e in subitem:
encoder.builders[key].add_item(np.array(e, dtype=encoder.builders[key].dtype))
else:
encoder.builders[key].add_item(np.array(value, dtype=encoder.builders[key].dtype))
encoder.builders[key].end_document()

for key, value in partition:
add_item(key, value)

for key in encoder.builders:
encoder.builders[key].finalize(encoder.output_idx_files[key])

lm_map_data = lm_rdd.mapPartitions(process_partition) \
.groupByKey().mapValues(list) \
.persist() # 持久化结果

# 触发计算并将结果保存到文件
result = lm_map_data.foreachPartition(save_partition)

# 清理缓存
lm_rdd.unpersist()
lm_map_data.unpersist()

# 关闭 SparkSession
spark.stop()

if __name__ == "__main__":
main()

Spark数据预处理的执行命令如下:

$ $SPARK_HOME/bin/spark-submit --executor-cores=2 --num-executors=2 ./pyspark_preprocess.py \
--input $DATA_PATH/input/stackexchange_aa.jsonl --output-prefix $DATA_PATH/output/result \
--vocab $DATA_PATH/20B_tokenizer.json --tokenizer-type HFTokenizer --append-eod --jsonl-keys text \
--workers 64 --append-eod

会在 $DATA_PATH/output 目录产生如下2个文件:

result_text_document.bin
result_text_document.idx

3.2.2. Flash Checkpoint异步保存

DLRover 提供异步保存Checkpoint的功能。Flash Checkpoint先同步从GPU保存至内存中,然后再异步持久化到磁盘。

加载Checkpoint时,优先从内存中读取。若内存中找不到,则会去磁盘加载Checkpoint。

保存Checkpoint时,支持选择保存在内存或者磁盘中。

目前分布支持的框架包括Megatron-LM,Torch DDP,Torch FSDP,HuggingFace和DeepSpeed等。详细的API接口参见 3.1.4 Flash Checkpoint的Python API应用

3.2.3. 网络检查功能开启

训练脚本中,dlrover-run 增加 --network-check 选项开启网络检查的功能。

  • 网络检查发生在训练任务开始之前,对于节点两两分组检查。

  • 对于网络节点检测失败的组,在下一轮和正常的组混合,从而确定故障的节点。

  • 对于故障的节点,DLRover支持标记为污点,新节点会重新加入训练组,确保新调度到的节点没有故障。

在执行 fine_tuning.py 脚本之前,会做网络检查。示例如下:

"dlrover-run --nnodes=$NODE_NUM --network-check \
--nproc_per_node=1 --max_restarts=1 \
./examples/pytorch/llama2/fine_tuning.py \
./examples/pytorch/llama2/btc_tweets_sentiment.json"

3.2.4. 动态资源扩缩容配置

dlrover-run 支持复用Torch的 --nnode 选项设置最大最小节点个数。当集群节点数量发生变化且满足约束条件时,DLRover支持扩缩容后重新开始训练。

  • 当训练任务下发以后,如果有新的节点加入,只要满足节点数约束,DLRover会扩容后重新开始训练任务。

  • 如果训练的节点发生故障且没有新节点加入时,只要满足节点数约束,DLRover会缩容后重新开始训练任务。

目前动态资源扩缩容仅支持Torch DDP模型。

在执行分布式训练的过程中,满足 min ≤ 节点数量 ≤ max 时,训练将立即开始。示例如下:

"dlrover-run --nnodes=min:max \
--nproc_per_node=1 --max_restarts=1 \
./examples/pytorch/llama2/fine_tuning.py \
./examples/pytorch/llama2/btc_tweets_sentiment.json"

3.2.5. 故障恢复与重启次数的配置

目前通过训练的YAML配置以及 dlrover-run 的配置设置重启的次数。

dlrover-run 通过设置 --max-restarts 指定子任务的重启次数,重启次数超过指定次数以后,训练的脚本会退出,子任务重启。主要应用在通过重启Python进程能恢复的故障。

通过训练的YAML配置 spec.replicaSpecs.worker.restartCounts 可以设置整个弹性训练中该rank的重启次数,即该rank旧的pod退出后分配新的pod的次数。 重启次数超过指定次数以后,训练任务会退出。主要应用在换节点也无法恢复的故障,该故障可能与训练框架有关。

在执行 fine_tuning.py 脚本的故障恢复时,如果没有超过3次,会在原节点自动重启训练子任务尝试恢复。示例如下:

"dlrover-run --nnodes=$NODE_NUM \
--nproc_per_node=1 --max_restarts=3 \
./examples/pytorch/llama2/fine_tuning.py \
./examples/pytorch/llama2/btc_tweets_sentiment.json"

YAML文件如下,指定了弹性训练中该rank的重启次数为 3

apiVersion: elastic.iml.github.io/v1alpha1
kind: ElasticJob
metadata:
  name: fine-tuning-llama2
  namespace: dlrover
spec:
  distributionStrategy: AllreduceStrategy
  optimizeMode: single-job
  replicaSpecs:
    worker:
      replicas: 4  # 任务数量
      restartCounts: 3 #重启次数
      template:
        spec:
          restartPolicy: Never
          containers:
            - name: main
              # yamllint disable-line rule:line-length
              image: registry.cn-hangzhou.aliyuncs.com/intell-ai/dlrover:llama-finetuning # 训练镜像
              imagePullPolicy: Always
              command: # 训练执行命令
                - /bin/bash
                - -c
                - "dlrover-run --nnodes=$NODE_NUM \
                  --nproc_per_node=1 --max_restarts=1  \
                  ./examples/pytorch/llama2/fine_tuning.py  \
                  ./examples/pytorch/llama2/btc_tweets_sentiment.json"
              resources:
                limits:
                  cpu: "8"
                  memory: 16Gi
                  metax-tech.com/gpu: 8 # 单个任务申请的GPU数量

3.2.6. 基于网络拓扑感知的调度优化

DLRover提供了基于网络拓扑感知的调度优化功能。支持通过configmap配置的方式,输入节点的网络拓扑,根据configmap配置的网络拓扑,对训练的集群节点进行排序。

默认情况下名称为 node-topology-config 的configmap配置为空,会随机给训练任务节点分配rank号;需要添加网络拓扑配置情况下,支持通过 kubectl edit 编辑网络拓扑配置。 排序后的节点集群网络的拓扑关系上邻近的节点拥有相邻的rank号。

目前的排序规则只负责按照集群节点上一层交换机从属关系编排rank号。

在执行任何分布式训练任务或网络测试之前,检查编辑网络拓扑的configmap配置。示例如下:

$ kubectl describe configmap node-topology-config -n dlrover
Name:         node-topology-config
Namespace:    dlrover
Labels:       app.kubernetes.io/managed-by=Helm
Annotations:

Data
====
topology_config:
----

asw-0 psw-0 192.168.0.0
asw-1 psw-0 192.168.0.1
asw-2 psw-0 192.168.0.2
asw-3 psw-0 192.168.0.3
asw-0 psw-0 192.168.0.4
asw-1 psw-0 192.168.0.5
asw-2 psw-0 192.168.0.6
asw-3 psw-0 192.168.0.7
asw-0 psw-0 192.168.0.8
asw-1 psw-0 192.168.0.9


BinaryData
====

如需新建或修改网络拓扑的configmap配置,可以编辑Helm values.yamltopoMapper. topology_config 字段,执行以下命令更新Helm Chart。

$ helm upgrade -n dlrover <release_name> <path_to_chart>

使用 kubectl logs -n dlrover <master-pod name> 命令检查日志会提供按rank排序的IP列表。

备注

配置拓扑排序后,训练的故障恢复必须从磁盘恢复。

3.2.7. 基于switchbox网络拓扑的网络检查

DLRover提供了基于switchbox网络拓扑的网络检查功能。可以检查单机GPU到switchbox之间的网络链路状态。单机检查采用 GPUA-switchbox-GPUB 的方式两两分组检测。发现链路故障后,会将单机设置污点,避免被调度到。

训练脚本中, dlrover-run 增加如下选项配置网络检查功能:

  • --switchbox-check:开启switchbox网络检查,需要在network-check开启之后使用。

  • --box-pairs:指定switchbox中的socketid对,默认为 0:1 2:4 3:5 6:7 四对。

  • --min-bandwidth:指定GPU到switchbox之间的最低门限带宽(MB/s),默认为 10000MB/s,低于该设置值会被认定为链路故障。

  • --min-channels:指定switchbox 到 GPU之间的通道对正常工作的最小通道数,默认为2组。

开启switchbox网络故障检测的配置示例如下:

"dlrover-run --network-check --switchbox-check --nnodes=$NODE_NUM \
--nproc_per_node=1 --max_restarts=3 \
./examples/pytorch/llama2/fine_tuning.py \
./examples/pytorch/llama2/btc_tweets_sentiment.json"

3.2.8. 基于xpu_timer的hang检测

xpu_timer 可以hook核函数并注入计时函数,基于计时函数判断指定核函数的执行时间是否超时。超时后会推送指标到server,标志该rank为hang住的状态, 并输出hang住的进程的栈信息。 用户通过查看多个hang进程合并后的Python和C的调用栈,分析具体hang的原因。

安装

执行以下命令安装xpu_timer:

pip install py_xpu_timer-1.1+maca231-cp310-cp310-linux_x86_64.whl

基本配置

  • 通过 xpu_timer_launch python xxx.py 加载程序。

  • 环境变量

    • XPU_TIMER_HANG_TIMEOUT:hang超时时间,缺省300秒

    • XPU_TIMER_TIMELINE_PATH:进程调用栈stacktrace和timeline输出的路径,缺省 /root/timeline

  • 通过 xpu_timer_stacktrace_viewer --path /root/timeline 合并调用栈,分别合并Python和C的调用栈。每个线程一个堆栈。

  • 合并堆栈命名规则为:func@source_path@stuck_rank|leak_rank

    • func:表示当前函数名,如果 gdb 获取不到会显示 ??

    • source_path:表示这个函数符号在代码中的源地址

    • stuck_rank:表示哪些 rank 的栈被合并,连续的 rank 号会折叠为 start-end,如 rank 0,1,2,3 折叠为 0-3

    • leak_rank:表示哪些rank 的栈没有被合并,连续的 rank 号也会折叠

示例

以两机llama2 7b 训练为例子,过程中reset 网卡触发超时。

  • reset网卡5分钟之后,部分的rank报hang超时(1,2,3,9,11~15),hang超时的rank输出相应的stacktrace文件到指定的目录。

  • 通过工具 xpu_timer_stacktrace_viewer --path /root/timeline 合并调用栈, 生成 cpp_stack.svgpy_stack.svg,如图 3.1 所示。

../_images/stacktrace.png

图 3.1 输出的stacktrace文件及合并后的svg文件

  • Python的调用栈如图 3.2 所示,可以看到部分卡在forward的P2P通信,部分卡在backward的P2P通信,这是底层网络故障导致的。

../_images/mergesvg.png

图 3.2 py_stack.svg示例

3.2.9. 支持gang-scheduler

支持批量调度的组件,DLRover可以在任务部署的yaml中指定调度器和引用的方式来调度到指定的调度器,由调度器根据当前的环境以及配置中的约束条件,判断是否批量调度。

下面以koordinator为例。koordinator详细信息可以参见 koordinator 官方文档

安装koordinator

helm repo add koordinator-sh https://koordinator-sh.github.io/charts/
helm repo update
helm install koordinator koordinator-sh/koordinator  --set imageRepositoryHost=registry.cn-beijing.aliyuncs.com

验证部署:

kubectl get pods -n koordinator-system -owide | grep Running
koord-descheduler-7fd487c968-7fcwf   1/1     Running                  0             7d      10.244.1.147     mxsrv003        <none>           <none>
koord-descheduler-7fd487c968-wkzk2   1/1     Running                  0             12d     10.244.2.50      mx-worker-146   <none>           <none>
koord-manager-5d975967b-9gc28        1/1     Running                  0             7d      10.244.1.146     mxsrv003        <none>           <none>
koord-manager-5d975967b-gcj92        1/1     Running                  0             12d     10.244.2.49      mx-worker-146   <none>           <none>
koord-scheduler-6f7c854dd6-jwzld     1/1     Running                  0             12d     10.244.2.51      mx-worker-146   <none>           <none>
koord-scheduler-6f7c854dd6-p9658     1/1     Running                  0             7d      10.244.1.145     mxsrv003        <none>           <none>
koordlet-7g859                       1/1     Running                  1 (12d ago)   12d     10.107.204.143   mxsrv003        <none>           <none>
koordlet-grzl4                       1/1     Running                  0             12d     10.107.204.146   mx-worker-146   <none>           <none>
koordlet-jn6nf                       1/1     Running                  0             12d     10.107.204.177   mxmaster1       <none>           <none>
koordlet-jsfdn                       1/1     Running                  0             5d23h   10.107.204.141   mxsrv001        <none>           <none>

配置创建podgroup

设置批量最小个数为4:

apiVersion: scheduling.sigs.k8s.io/v1alpha1
kind: PodGroup
metadata:
  name: torch-mnist-gang
  namespace: dlrover
spec:
  scheduleTimeoutSeconds: 100
  minMember: 4

查看部署结果:

kubectl get podgroups.scheduling.sigs.k8s.io -n dlrover
NAME               AGE
torch-mnist-gang   5s

创建弹性训练任务

任务配置参考如下,需要指定 schedulerNameannotations

apiVersion: elastic.iml.github.io/v1alpha1
kind: ElasticJob
metadata:
  name: torch-mnist-gang
  namespace: dlrover
spec:
  distributionStrategy: AllreduceStrategy
  optimizeMode: single-job
  replicaSpecs:
    worker:
      replicas: 2
      template:
        metadata:
          annotations:
            gang.scheduling.koordinator.sh/name: "torch-mnist-gang"
            gang.scheduling.koordinator.sh/min-available: "4"
        spec:
          restartPolicy: Always
          schedulerName: koord-scheduler
          containers:
            - name: main
              image: xxx/torch:version
              imagePullPolicy: Always
              command:
                - /bin/bash
                - -c
                # NODE_NUM is set into env with the value as replicas.
                - "sleep infinity"
              resources:
                limits:
                  cpu: "4"  # turn up when using GPU
                  memory: 8Gi  # turn up when using GPU
                  metax-tech.com/gpu: 1 # optional
                requests:
                  cpu: "4"  # turn up when using GPU
                  memory: 8Gi  # turn up when using GPU
                  metax-tech.com/gpu: 1  # optional

当批量的条件没有满足时,会处于pending的状态,直到初次满足条件后,会允许该组pod被调度。本次因为不满足最小批量4的条件,调度处于pending的状态:

kubectl get pods -n dlrover
NAME                                           READY   STATUS    RESTARTS   AGE
dlrover-controller-manager-d4dfff6dc-d5lt4     2/2     Running   0          5d
elasticjob-torch-mnist-gang-dlrover-master-0   1/1     Running   0          11s
torch-mnist-gang-edljob-worker-0               0/1     Pending   0          3s
torch-mnist-gang-edljob-worker-1               0/1     Pending   0          3s

gang.scheduling.koordinator.sh/min-availableminMember 改为 2,重新下发,可以看到pod处于running状态:

kubectl get pods -n dlrover
NAME                                           READY   STATUS    RESTARTS   AGE
dlrover-controller-manager-d4dfff6dc-d5lt4     2/2     Running   0          5d21h
elasticjob-torch-mnist-gang-dlrover-master-0   1/1     Running   0          66s
torch-mnist-gang-edljob-worker-0               1/1     Running   0          57s
torch-mnist-gang-edljob-worker-1               1/1     Running   0          57s