目录

ElasticDL 弹性分布式研读

项目介绍

  ElasticDL 是蚂蚁金服开源的一个基于 TensorFlow 2.0 eager execution 和 Kubernetes 的弹性分布式深度学习框架。ElasticDL 没有像 Kubeflow 和 pytorch/elastic 那样选择开发 Kubernetes Operator,整个 elasticDL 跟 Kubernetes 的沟通主要依赖 k8s 的 client 进行对简单的对象(pod,job)进行管理。ElasticDL 通过在 Kubernetes 上创建 master 进程来控制深度学习训练作业的弹性调度。注意下面内容针对的是 Elastic 0.2.0 rc2的版本进行探讨的。

对 Tensorflow 2.x 的支持

  我们知道 TensorFlow 2.x eager mode 采用和 TensorFlow 1.x 解释执行图完全不同的深度学习计算方式。前向计算过程把对基本计算单元(operator)的调用记录在一个内存数据结构 tape 里, 随后反向计算过程(计算 gradients)可以回溯这个 tape,以此调用 operator 对应的 gradient operator。 ElasticDL 会通过 tape 获取 gradient 后,可以使用 Parameter Server 或者 AllReduce 分布式策略来更新模型参数。

系统架构

​  ElasticDL 的 master 会根据数据索引将数据分片,为每个数据分片创建一个 task。然后 master 会调用 Kubernetes API 启动多个 worker 进程。每个 worker 启动后,会向 master 请求 task。worker 收到来自 master 分发的 task 后,会读取 task 对应的数据分片来前向计算和梯度计算。

​ 同时,master 会通过 Kubernetes API 监听集群中每个 worker 的状态。当有 worker 被高优先级作业抢占后,master 会回收该 worker 的未完成 task,然后重新分发给其他的 worker。同时 master 会尝试通过 Kubernetes API重新拉起被抢占的 worker。等到资源充足时,worker 进程会被重新启动,并加入训练作业,整体流程如下图所示。

准备模型和相关函数

  用户需要在 model_zoo 的文件夹下把 tensorflow 模型和相关的训练测试组件定义好,如:优化器 optimizer,损失函数 loss function,数据集 dataset,验证性能函数 eval_metrics_fn 等,下面是定义一个训练的例子。整体上, 如果是已实现的模型,可能需要按照他们的方式重新定义训练一次。

model_zoo/cifar10/xxx_functional_api.py

def custom_model() -> tf.keras.Model
def loss() -> tf.loss
def callbacks()
def optimizer() -> tf.optimizers
def dataset_fn() -> tf.data.Dataset
def eval_metrics_fn() -> dict(string:tf.keras.metrics)

然后需要把你这些模型相关的定义文件copy到镜像里面,推上仓库。

cd ${CODE_PATH}/elasticdl/model_zoo
elasticdl zoo init
elasticdl zoo build --image=${DOCKER_HUB_REPO}/elasticdl:mnist .
elasticdl zoo push ${DOCKER_HUB_REPO}/elasticdl:mnist

运行 elastic 客户端

这步对应上图(ElasticDL Client)

elasticdl train \
 --image_name=elasticdl:mnist \
 --model_zoo=model_zoo \
 --model_def=mnist.mnist_functional_api.custom_model \
 --training_data=/data/mnist/train \
 --job_name=test-mnist \
 --volume="host_path=/data,mount_path=/data"

这一步主要干了以下一些工作:

  • 通过 k8s client 使用前面打好的镜像启动 master pod ,pod 的运行命令为:

python -m elasticdl.python.master.main -xxx

Master.main 主要做了以下工作

  • 设置这个 job 训练的模型(model)
  • 启动 task_dispatcher,用于分割数据集,然后告诉不同的 worker 读取dataset 不同的部分,通过传递index实现。
  • 启动 instance_manager 用于管理worker 和 ps 的生命周期,instance_manager 通过调用 k8s 的客户端启动 parameter server 和 workers 的 pod ,同时记录 ps, worker 的 ID。
  • 启动 master_service 和 ps_service,前者是用于响应 worker get_task 的request 的,后者用于同步 worker 过来的梯度 gradient 做 all_reduce等动作。(这里的两个server 好像是在一起的,没有另外起pod)
  • 启动好的 worker 使用 get_task 从 master service 轮询 task id,做完前向运算(下称 forward),后向运算(下称 backward) 之后把梯度更新到 PS server 上,调用流程如下图。
  • 直到没有 task 或者 所有的 worker 都 failed 才会停止。

数据分发

  数据需要处理成多个 chunk (默认支持的是多个 tfrecords)。master 会把数据看成一个 record list。把不同的 record_index 组合成不同的 task,分给不同的 workers。这里需要不同的 worker 都能看到这份数据。workers 会把读取的数据,塞进 models,获取梯度 gradient,在 parameter server 上更新,数据分发/读取如下图所示。

弹性机制实现

  ElasticDL 默认就是需要在 k8s 上使用的,不过整体上没有相关的资源CRD,也没有对应的operator 操作资源。基本上使用的是 k8s 最原生的基础资源 (pod, job, service 等),elasticDL 的弹性是通过 k8s 不同资源的优先级实现的。

删除 Worker

  如果有其他的 workload 优先级比现在跑的 jobs 高,就会把 workers 的 pod 删掉。因为这些 pod 是没有对应的 owner 的,死掉就死掉了,没有完成的 tasks 因为会超时,会从 Doing -> TODO 重新入队。

增加 Worker

  Master 会利用 InstanceManager 一直 watch namespace 下 pods 的状态,如果有空闲,就会 relaunch workers。 worker 起来后又会主动向 master get_task。如下面1图到2图是增加了一个 worker 之后,task 的分配。3图是有 worker 退出后的情况。

总结

  我个人的感觉是 ElasticDL 是一个比较独立的系统,Tensorflow 的部分和 Kubernetes 的部分耦合在一起了,后续支持新的框架和新的调度方式有难度。在项目里直接拿过来用在 k8s 的系统上使用,很容易会跟k8s 调用逻辑有冲突(如需要自己打镜像再推镜像,然后自己去尝试重启死掉的pod 等的操作,看起来都不是很native)。而且使用上对用户有一定的学习成本,并且不是所有tensorflow 定义的模型都可以比较顺利的跑起来,感觉只支持tensorflow 创建出来模型的一个子集。

  不过 ElasticDL 某程度上提供一种在 k8s 上分布式训练的思路吧,ElasticDL 算是把 tensorflow 的数据预处理的部分和梯度更新部分使用自己的方法实现了一次,让其在 k8s 上调度会更加的灵活(相当于调度的粒度是一个batch的数据和梯度)。 tf-operator 和 pytorch-operator 只负责把容器起起来,然后让训练的程序在上面跑,数据的读取和梯度的整合都需要程序描述好(某程度上对外部的调度系统(k8s)是不感知的),所以不同worker 之间的动态联动也是有限的,调度的粒度为 pod。

参考