Pytorch/elstic 弹性分布式研读
项目介绍
Pytorch/elastic (下称 elastic) 是 pytorch 1.4 作为新 feature 和 pytorch/serve 等功能一同引入的。从使用上,可以看出 elastic 有意地维持了跟原来使用 torch.distributed.launch 等接口的相似性,大致看使用过 pytorch 提供的分布式训练脚本,基本上可以很快配置好 elastic 分布式任务。学习成本相对较低。
系统架构
下图为 pytorch/elastic 系统设计示例图。
![](v2-20dd0be5ff19d3549c174774afa899ec_b.png)
Elastic 主要的原理是在每一个正在跑 pytorch 分布式任务的 node 上,会有一个代理进程(下称 agent) 不断地去观测 node 上分布式任务进程的状态,然后把状态上报到一个 key-value 的 server 上,默认使用的是 etcd。
agent 是每一个 process group 在 每个 node 上面跑的管理进程。主要是同步该 node 跟其他 node 的关系(membership)。(如:其他 node 上的进程死掉了,我的 rank 和 world_size 应该怎么变化)。
不同 node 的关系是这样处理的,如果有一个 node 上的 worker 出现了错误,这个 node 的 agent 会去重新启动这些 worker,如果是这个节点出了问题,就是agent 也没有了响应,需要更加高层的 owner 去管理 (一般还是 restart 这个 job)。这些信息都会同步到对应的 world_size 和 rank 上,所以 training script 可能更新的时候需要确认world_size 和 rank 没有被改变。
整体流程
正常启动流程
假设你现在有3台互联的机器,需要在一台机器上启动etcd,作为monitor
etcd --enable-v2
--listen-client-urls http://0.0.0.0:2379,http://127.0.0.1:4001
--advertise-client-urls PUBLIC_HOSTNAME:2379
在其他两台机器上运行训练代码
python -m torchelastic.distributed.launch
--nnodes=MIN_SIZE:MAX_SIZE
--nproc_per_node=TRAINERS_PER_NODE
--rdzv_id=JOB_ID
--rdzv_backend=etcd
--rdzv_endpoint=ETCD_HOST:ETCD_PORT
YOUR_TRAINING_SCRIPT.py --arg1 # train script args
这里:
– nnodes 是指运行分布式训练的 node 数,如果是一个数值,整个训练过程会保持这个 node 数。如果是一个区间就代表这个训练的 node 数可变。(注意:如果设置了 node 可变的训练代码需要做一些适配,ElasticDistributedSampler )。
–rdzv_id 是这个job 的一个名字,不同的worker这个需要相同,
–rdzv_backend 是指 monitor 的后端默认是 etcd,
–rdzv_endpoint 是 monitor 的 ip 和 端口。
下图为运行逻辑示意图:
![](v2-fee94053955501d8a013b978ca9a57b0_b.png)
当 job 起来之后,训练进程(下称 process)会自己跑自己的训练代码,跟没有 agent 一样,只是在开始的时候 agent 会把原来一些 torch.distributed.launch 做的工作做了,然后传给训练的进程。agent 会定时触发类似一个存活指针的验证流程(默认是 30s 触发一次),查看process的状态,如果没有报错,会作一次 rendezvous 的动作,同时会同步现在进程的状态(下称status)到 monitor 上。
node 上 training process 报错
如果 node 上一个或者多个 process 报错,agent 会重启这个 node 下所有的 process。训练代码需要自己完成加载最近 checkpoint 功能。agent 会把 monitor 上的 status 同步到该 node 上所有的process (如 epoch, optimizer, accuracy 等),继续原来的训练。所以在重启的worker 会一直跑,跑到分布式的 all_reduce的时候,一起更新权重这样权重就一致了, 然后state.epoch 下也会在新的一次 epoch 更新一致。
node/agent 报错
如果是 node 或者是 node 上 agent 报错,上述的 rendezvous 动作会被阻塞(下称 barrier),如果阻塞的时间在允许范围内重新连接了,rendezvous 会继续。如果时间超过允许范围了,会从这个 job 中排除这个 node,然后重启所有 node 上所有的 processes。读取对应的 checkpoint,和根据 monitor state 恢复最近checkpoint 的状态。
Kubenetes 支持
上述的流程一般需要一个更高级的owner 去管理这个job(agent 挂了需要重启这个node等),默认是 k8s。所以 elastic 也实现了管理 elasticjob CRD 的 controller。
api/v1alpha1/elasticjob_types.go
type ElasticJobSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
RunPolicy common.RunPolicy `json:",inline"`
// +kubebuilder:validation:MinItems=1
ReplicaSpecs map[common.ReplicaType]*common.ReplicaSpec `json:"replicaSpecs"`
RdzvEndpoint string `json:"rdzvEndpoint"`
// +kubebuilder:validation:Minimum=1
MinReplicas *int32 `json:"minReplicas,omitempty"`
MaxReplicas *int32 `json:"maxReplicas,omitempty"`
}
// ElasticJobStatus defines the observed state of ElasticJob
type ElasticJobStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
common.JobStatus `json:",inline"`
}
// ElasticJob is the Schema for the elasticjobs API
type ElasticJob struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec ElasticJobSpec `json:"spec,omitempty"`
Status ElasticJobStatus `json:"status,omitempty"`
}
尝试启用
实测使用 class-vion.yaml,需要先启动 etcd
➜ samples git:(master) ✗ kubectl apply -f etcd.yaml
➜ samples-10 git:(master) ✗ kubectl apply -f classy-vision.yaml
config/samples/class-vision.yaml
apiVersion: elastic.pytorch.org/v1alpha1
kind: ElasticJob
metadata:
name: classy-vision
namespace: elastic-job
spec:
# Use "etcd-service:2379" if you already apply etcd.yaml
rdzvEndpoint: "etcd-service:2379"
minReplicas: 1
maxReplicas: 2
replicaSpecs:
Worker:
replicas: 2
restartPolicy: ExitCode
template:
apiVersion: v1
kind: Pod
spec:
containers:
- name: elasticjob-worker
image: torchelastic/examples:0.2.0
imagePullPolicy: Always
args:
- "--nproc_per_node=1"
- "/workspace/classy_vision/classy_train.py"
- "--config_file"
- "/workspace/classy_vision/configs/template_config.json"
# number of data loader workers (NOT trainers)
# zero means load the data on the same process as the trainer
# this is set so that the container does not OOM since
# pytorch data loaders use shm
- "--num_workers=0"
查看相关资源
➜ samples git:(master) ✗ kubectl -n elastic-job get elasticjobs
NAME AGE
classy-vision 9d
➜ samples git:(master) ✗ kubectl -n elastic-job get po
NAME READY STATUS RESTARTS AGE
classy-vision-worker-0 0/1 Completed 0 8d
classy-vision-worker-1 0/1 Completed 0 8d
etcd 1/1 Running 0 9d
➜ samples git:(master) ✗ kubectl -n elastic-job get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
classy-vision-worker-0 ClusterIP None <none> <none> 9d
classy-vision-worker-1 ClusterIP None <none> <none> 9d
etcd-service ClusterIP 10.254.74.130 <none> 2379/TCP 9d
上面显示了起一个 elastic-job 创建的资源,整个训练的流程会需要预先创建一个 etcd service 服务,然后创建 elasticjob,整个流程跟 pytorch-job 很类似,也是会为每个运行实例(下称 worker )创建 pod 和 service。结束会回收资源等。agent 自己是以进程的形式运行在 worker 中。
对比kubeflow/pytorch-operator
整体上说,pytorch/elastic 跟 kubeflow/pytorch-operator 使用是比较类似的,需要做一些适应性调整可以跟 kubeflow/pytorch-operator 保持行为一致,如:
elastic/kubernetes/controllers/pod.go
// Set pod environment set for ElasticJob
func SetClusterSpecForPod(job interface{}, podTemplate *corev1.PodTemplateSpec) error {
...
launchDefaultArgs := []string{
"--rdzv_backend=etcd",
"--rdzv_endpoint=" + elasticJob.Spec.RdzvEndpoint,
"--rdzv_id=" + elasticJob.Name,
"--nnodes=" + strconv.Itoa(int(minReplicas)) + ":" + strconv.Itoa(int(maxReplicas))}
for i := range podTemplate.Spec.Containers {
podTemplate.Spec.Containers[i].Args = append(launchDefaultArgs, podTemplate.Spec.Containers[i].Args...)
}
return nil
}
直接在 controller 里面添加运行代码参数 pod.spec.container.args,然后在 elasticjob.yaml 中的 pod.spec.container[i].args 只填了一部分。 在 pytorch-operator 是需要把全部的命令填写完整的。
局限
直接使用 etcd 作为 key-value store 感觉有点杀鸡用牛刀了,而且一般只会起一个 etcd 的服务器(如果集群内跑的elastic-job 不是很多,起多个会导致资源浪费),只起一个没办法很完整地使用上 etcd 高可用的特性。pytorch/elastic 提供了相关的RendezvousHandler接口,可以使用不同的key-value store server。
#### 适应性调整
自己的想法:另外写一个 crd 把每次同步的信息,作为 status。然后 RendezvousHandler 下面的api,通过 k8s client 跟读写 crd 的 status 信息。
class RendezvousHandler(abc.ABC):
@abc.abstractmethod
# pyre-fixme[11]: Annotation `Store` is not defined as a type.
# pyre-fixme[10]: Name `torch` is used but not defined.
def next_rendezvous(self) -> Tuple["torch.distributed.Store", int, int]:
"""
每隔一段时间要做一次的同步
"""
pass
@abc.abstractmethod
def is_closed(self) -> bool:
pass
@abc.abstractmethod
def set_closed(self):
pass
@abc.abstractmethod
def num_nodes_waiting(self) -> int:
"""
表示有多少的 nodes 已经跑到 barrier 那里了。
"""
pass
@abc.abstractmethod
def get_run_id(self) -> str:
"""
获取这个 job-id,相当于这个job的标识
"""
pass
相当于替换掉etcd,信息通过k8s 的资源 status 同步。如果使用这个方法,是会比较 k8s native 一点,不过需要给不同的 worker k8s client 的权限,这个可能会引入安全性的问题。
结论
整体上,pytorch/elastic 的只实现了一部分的弹性,可以缩容不可以扩容。死掉的 worker 如果超时了,即使重启后也不能再加进去。另外,默认使用 etcd 和一些不是很直观的使用方法,还不是很方便。不过,有 pytorch 官方背书,也确认 elastic 不负责 checkpoint reload,需要用户自己解决这一点,我感觉是合理的。
后续 PyTorch Elastic 在 Kubeflow 中的支持在 https://github.com/pytorch/elastic/issues/117 中跟踪。