ElasticDL 弹性分布式研读
项目介绍
ElasticDL 是蚂蚁金服开源的一个基于 TensorFlow 2.0 eager execution 和 Kubernetes 的弹性分布式深度学习框架。ElasticDL 没有像 Kubeflow 和 pytorch/elastic 那样选择开发 Kubernetes Operator,整个 elasticDL 跟 Kubernetes 的沟通主要依赖 k8s 的 client 进行对简单的对象(pod,job)进行管理。ElasticDL 通过在 Kubernetes 上创建 master 进程来控制深度学习训练作业的弹性调度。注意下面内容针对的是 Elastic 0.2.0 rc2的版本进行探讨的。
对 Tensorflow 2.x 的支持
我们知道 TensorFlow 2.x eager mode 采用和 TensorFlow 1.x 解释执行图完全不同的深度学习计算方式。前向计算过程把对基本计算单元(operator)的调用记录在一个内存数据结构 tape 里, 随后反向计算过程(计算 gradients)可以回溯这个 tape,以此调用 operator 对应的 gradient operator。 ElasticDL 会通过 tape 获取 gradient 后,可以使用 Parameter Server 或者 AllReduce 分布式策略来更新模型参数。
系统架构
ElasticDL 的 master 会根据数据索引将数据分片,为每个数据分片创建一个 task。然后 master 会调用 Kubernetes API 启动多个 worker 进程。每个 worker 启动后,会向 master 请求 task。worker 收到来自 master 分发的 task 后,会读取 task 对应的数据分片来前向计算和梯度计算。
同时,master 会通过 Kubernetes API 监听集群中每个 worker 的状态。当有 worker 被高优先级作业抢占后,master 会回收该 worker 的未完成 task,然后重新分发给其他的 worker。同时 master 会尝试通过 Kubernetes API重新拉起被抢占的 worker。等到资源充足时,worker 进程会被重新启动,并加入训练作业,整体流程如下图所示。

准备模型和相关函数
用户需要在 model_zoo 的文件夹下把 tensorflow 模型和相关的训练测试组件定义好,如:优化器 optimizer,损失函数 loss function,数据集 dataset,验证性能函数 eval_metrics_fn 等,下面是定义一个训练的例子。整体上, 如果是已实现的模型,可能需要按照他们的方式重新定义训练一次。
model_zoo/cifar10/xxx_functional_api.py
def custom_model() -> tf.keras.Model
def loss() -> tf.loss
def callbacks()
def optimizer() -> tf.optimizers
def dataset_fn() -> tf.data.Dataset
def eval_metrics_fn() -> dict(string:tf.keras.metrics)
然后需要把你这些模型相关的定义文件copy到镜像里面,推上仓库。
cd ${CODE_PATH}/elasticdl/model_zoo
elasticdl zoo init
elasticdl zoo build --image=${DOCKER_HUB_REPO}/elasticdl:mnist .
elasticdl zoo push ${DOCKER_HUB_REPO}/elasticdl:mnist
运行 elastic 客户端
这步对应上图(ElasticDL Client)
elasticdl train \
--image_name=elasticdl:mnist \
--model_zoo=model_zoo \
--model_def=mnist.mnist_functional_api.custom_model \
--training_data=/data/mnist/train \
--job_name=test-mnist \
--volume="host_path=/data,mount_path=/data"
这一步主要干了以下一些工作:
- 通过 k8s client 使用前面打好的镜像启动 master pod ,pod 的运行命令为:
python -m elasticdl.python.master.main -xxx
Master.main 主要做了以下工作
- 设置这个 job 训练的模型(model)
- 启动 task_dispatcher,用于分割数据集,然后告诉不同的 worker 读取dataset 不同的部分,通过传递index实现。
- 启动 instance_manager 用于管理worker 和 ps 的生命周期,instance_manager 通过调用 k8s 的客户端启动 parameter server 和 workers 的 pod ,同时记录 ps, worker 的 ID。
- 启动 master_service 和 ps_service,前者是用于响应 worker get_task 的request 的,后者用于同步 worker 过来的梯度 gradient 做 all_reduce等动作。(这里的两个server 好像是在一起的,没有另外起pod)
- 启动好的 worker 使用 get_task 从 master service 轮询 task id,做完前向运算(下称 forward),后向运算(下称 backward) 之后把梯度更新到 PS server 上,调用流程如下图。
- 直到没有 task 或者 所有的 worker 都 failed 才会停止。

数据分发
数据需要处理成多个 chunk (默认支持的是多个 tfrecords)。master 会把数据看成一个 record list。把不同的 record_index 组合成不同的 task,分给不同的 workers。这里需要不同的 worker 都能看到这份数据。workers 会把读取的数据,塞进 models,获取梯度 gradient,在 parameter server 上更新,数据分发/读取如下图所示。

弹性机制实现
ElasticDL 默认就是需要在 k8s 上使用的,不过整体上没有相关的资源CRD,也没有对应的operator 操作资源。基本上使用的是 k8s 最原生的基础资源 (pod, job, service 等),elasticDL 的弹性是通过 k8s 不同资源的优先级实现的。
删除 Worker
如果有其他的 workload 优先级比现在跑的 jobs 高,就会把 workers 的 pod 删掉。因为这些 pod 是没有对应的 owner 的,死掉就死掉了,没有完成的 tasks 因为会超时,会从 Doing -> TODO 重新入队。
增加 Worker
Master 会利用 InstanceManager 一直 watch namespace 下 pods 的状态,如果有空闲,就会 relaunch workers。 worker 起来后又会主动向 master get_task。如下面1图到2图是增加了一个 worker 之后,task 的分配。3图是有 worker 退出后的情况。

总结
我个人的感觉是 ElasticDL 是一个比较独立的系统,Tensorflow 的部分和 Kubernetes 的部分耦合在一起了,后续支持新的框架和新的调度方式有难度。在项目里直接拿过来用在 k8s 的系统上使用,很容易会跟k8s 调用逻辑有冲突(如需要自己打镜像再推镜像,然后自己去尝试重启死掉的pod 等的操作,看起来都不是很native)。而且使用上对用户有一定的学习成本,并且不是所有tensorflow 定义的模型都可以比较顺利的跑起来,感觉只支持tensorflow 创建出来模型的一个子集。
不过 ElasticDL 某程度上提供一种在 k8s 上分布式训练的思路吧,ElasticDL 算是把 tensorflow 的数据预处理的部分和梯度更新部分使用自己的方法实现了一次,让其在 k8s 上调度会更加的灵活(相当于调度的粒度是一个batch的数据和梯度)。 tf-operator 和 pytorch-operator 只负责把容器起起来,然后让训练的程序在上面跑,数据的读取和梯度的整合都需要程序描述好(某程度上对外部的调度系统(k8s)是不感知的),所以不同worker 之间的动态联动也是有限的,调度的粒度为 pod。