天行九歌 2018-12-18
在深度学习场景中,数据集的大小和模型的复杂性都在增加。加速他们的训练是一项重大挑战,需要超级计算所能满足的更大的计算资源需求。
在这篇文章中,我们将探讨如何将单个深度神经网络(DNN)的训练分布在多个gpu和服务器上,以减少训练时间。我们将使用 Estimators作为TensorFlow API,使用Horovod作为分布方法。
深度神经网络(DNN)的成功基础是构建具有数百万参数的高学习容量模型,这些参数以数据驱动的方式进行调整。通过处理数百万个示例来训练这些模型,使得更精确算法的开发通常受到训练它们的计算设备的吞吐量的限制。
因此,我们使用了分布式训练的方法。这种计算范式背后的主要思想是并行运行任务,而不是像在单机中那样串行运行任务。
1.1并行计算机平台
DNN通常是计算密集型的,使其类似于传统的超级计算(高性能计算,HPC)应用程序。因此,大型学习工作负载在加速系统上表现得非常好,例如在超级计算领域中使用的通用图形处理单元(GPU)。此外,与超级计算机类似,带有GPU等加速器的分布式内存架构已成为当今各种规模DNN的默认选项。正如我们将在本文后面看到的那样,我们可以通过使用许多超级计算基础设施来缩短DNN的训练时间。
1.2性能指标
在这篇文章中,我们将重点关注在多个设备(如GPU)上并行运行训练的分布式训练,以便使训练过程更快,我们将需要一些性能指标来衡量它。
这些系统中的性能一词有双重解释。一方面,它指的是深度学习模型的预测准确性。另一方面,是计算速度。
准确度与平台无关,它是比较多个模型的性能指标,而计算速度取决于部署模型的平台,在本文中,我们将通过以下指标对其进行测量:
这些指标将高度依赖集群配置、使用的网络类型或使用库和管理资源的框架的效率。
1.3并行性的类型
为了实现训练步骤的分配,有两个主要实现,它将取决于应用程序的需求,以了解哪个将更好地执行,或者两种方法的混合是否可以提高性能。
例如,深度学习模型中的不同层可以在不同的GPU上并行训练。该训练过程通常称为模型并行性。另一种方法是数据并行,其中我们对每个执行单元使用相同的模型,但是使用不同的训练样本在每个计算设备中训练模型。
在这种模式下,训练数据被分成多个子组,并且在不同的节点(工作节点)中,它们中的每一个都在相同的复制模型上运行。这些将需要在批处理计算结束时同步模型参数(或其“梯度”),以确保它们正在训练一致的模型(就像算法在单个处理器上运行一样),因为每个设备将独立计算其对其训练样本的预测与标记输出(这些训练样本的正确值)之间的误差。因此,每个设备都必须将所有的更改发送给所有其他设备的模型。
这个设置的一个有趣的特性是,它将根据可用数据的数量进行伸缩,并加快整个数据集对优化的贡献速度。此外,节点之间的通信更少,因为它受益于每个权重的大量计算。另一方面,该模型必须完全适合每个节点,主要用于具有大数据集的卷积神经网络的快速计算。
在这种情况下(也称为网络并行),模型将被分段为可以并发运行的不同部分,并且每个部分将在不同节点中的相同数据上运行。该方法的可扩展性取决于算法的任务并行化程度,并且实现比前一个更复杂。它可能会降低通信需求,因为工作节点只需要同步共享参数(通常每个正向或反向传播步骤一次),并且适用于共享高速总线的单个服务器中的GPU。它可以用于更大的模型,因为每个节点的硬件限制不再是限制。
因此,通常,算法的并行化比在具有数据子集的不同节点中运行相同模型更复杂。
在这篇文章中,我们将关注数据并行方法。
在分布式环境中,可能存在独立运行的随机梯度下降(SGD)的多个实例,因此必须调整整体算法并且应考虑与模型一致性或参数分布相关的不同问题。
2.1同步与异步分布式训练
随机梯度下降(SGD)是一种迭代算法,涉及多轮训练,其中每轮的结果被合并到模型中以准备下一轮。轮可以同步或异步在多个设备上运行。
每个SGD迭代都在一小批训练样本上运行。在同步训练中,所有设备使用来自单个(大)小批量的不同数据部分来训练他们的本地模型。然后,他们将本地计算的梯度(直接或间接)传达给所有设备。只有在所有设备成功计算并发送其梯度后,才会更新模型。然后将更新的模型与下一个小批量的拆分一起发送到所有节点。
在异步训练中,没有设备等待来自任何其他设备的模型更新。这些设备可以独立运行并以对等方式共享结果,或通过称为“参数”服务器的一个或多个中央服务器进行通信。在对等体系结构中,每个设备运行一个循环,该循环读取数据,计算梯度,(直接或间接)将它们发送到所有设备,并将模型更新为最新版本。
根据这项来自苏黎世ETH (Zurich)的调查,在实际实现中,这些方法对于最多32-50个节点是同步的,对于较大的集群和异构环境是异步的。在这篇文章中,我们将关注一种同步训练方法。
2.2同步训练中的参数分配和通信
对于同步训练,我们可以选择两种主要方案:集中式或分散式。为DNN训练设计集中式和分散式方案之间的选择取决于多种因素,包括网络拓扑,带宽,通信延迟,参数更新频率或所需的容错。
集中式方案通常包括所谓的参数服务器策略。
当并行SGD使用参数服务器时,算法首先将模型广播给工作节点(设备)。在每次训练迭代中,每个工作节点从小批量中读取其自己的拆分,计算其自己的梯度,并将这些梯度发送到一个或多个参数服务器。参数服务器聚合来自工作节点的所有梯度,并等到所有工作人员完成后才计算下一次迭代的新模型,然后广播给所有工作节点。
分散式方案将依赖于ring-allreduce来在节点之间传递参数更新。在ring-allreduce架构中,没有中央服务器聚合来自工作节点的梯度。相反,在训练迭代中,每个工作节点为小批量读取其自己的拆分,计算其梯度,将其梯度发送到环上的后继邻居,并从环上的其上一个邻居接收梯度。
Uber Engineering推出了Michelangelo,这是一个内部的“以服务为本”(mx -as-a-service)平台,可以方便地大规模构建和部署这些系统。Horovod是Michelangelo的一个组件,是TensorFlow的一个开源分布式训练框架,它的目标是通过ring-allreduce使分布式深度学习快速且易于使用,只需要对用户代码进行几行修改。
3.1 Horovod概述
Horovod是使用pip安装的,它需要事先安装Open MPI和Nvidia的NCCL-2库来支持GPU间通信。Horovod 在Apache 2.0许可下可用。
从概念上讲,Horovod下的数据并行分布式训练范例很简单:
1.运行多个训练脚本副本,每个副本:
2.这些多份副本之间的平均梯度
3.更新模型
4.重复(从步骤1开始)
Horovod采用百度算法对平均梯度进行平均,并将这些梯度传递给遵循ring-allreduce分散方案的所有节点(上面的步骤2和3)。该算法基于Patarasuk和Yuan在2009年论文(http://www.cs.fsu.edu/~xyuan/paper/09jpdc.pdf)中介绍的方法。Horovod用NCCL-2取代了百度ring-allreduce实现,这是NVIDIA的collective communication库,可在多台机器上提供高度优化的ring-allreduce版本。
Sergeev和Balso的论文(https://arxiv.org/pdf/1802.05799.pdf)中的下图显示了ring-allreduce算法,该算法允许工作节点平均梯度并将它们分散到所有节点,而无需使用参数服务器的集中式方案。
在该ring-allreduce算法中,N个节点中的每一个与其两个对等体通信2 *(N-1)次。在此通信期间,节点发送和接收数据缓冲区的块。在前N-1次迭代中,接收的值被添加到节点缓冲区中的值。在第二次N-1次迭代中,接收的值替换节点缓冲区中保存的值。Patarasuk和Yuan建议该算法是带宽最优的,这意味着如果缓冲区足够大,它将最佳地利用可用网络。
3.2估算器中的用法
它引入了一个hvd必须初始化的对象,并且必须包装优化器(hvd使用allreduce或allgather平均梯度)。GPU使用其本地rank绑定到此进程,并且我们在初始化期间将rank 0中的变量广播到所有其他进程。
使用该mpirun命令启动Horovod Python程序。它将每个服务器的主机名以及每台服务器上使用的GPU数作为参数。
要在Tensorflow中使用Horovod和估算器,您必须对程序进行以下添加:
1. 导入Horovod:
import horovod.tensorflow as hvd
2. Horovod必须在开始之前初始化:
hvd.init()
3.将服务器GPU固定在此进程中使用。对于每个进程一个GPU的典型设置,可以将其设置为local rank。在这种情况下,服务器上的第一个进程将分配第一个GPU,第二个进程将分配第二个GPU,依此类推。
config = tf.ConfigProto() config.gpu_options.allow_growth = True config.gpu_options.visible_device_list = str(hvd.local_rank()) estimator = tf.estimator.Estimator( model_fn=model_fn, model_dir=model_dir, config=tf.estimator.RunConfig(session_config=config))
4.在hvd.DistributedOptimizer中包含优化器。分布式优化器将梯度计算委托给原始优化器,使用allreduce或allgather平均渐变,然后应用这些平均梯度。
5.添加hvd.BroadcastGlobalVariablesHook(0)以将rank 0的初始变量状态广播到所有其他进程。这对于确保在使用随机权重开始训练或从检查点恢复时所有工作节点的一致初始化是必要的。
6.按工作节点数量调整学习率。同步分布式训练中的有效批量大小按工作节点数量进行调整。学习率的提高可以补偿增加的批量。你可以在Facebook的论文中了解更多相关信息。
7.修改您的代码,使其仅保存工作节点0上的检查点,以防止其他工作人员破坏检查点
model_dir=None if hvd.rank() != 0 tf.estimator.Estimator( model_fn=model_fn, model_dir=model_dir)
8.在单GPU中运行评估更好
if hvd.rank() == 0: estimator.evaluate()
3.3运行Horovod
要在具有4个GPU的计算机上运行,我们将使用它mpirun 来运行python脚本:
要在4台机器上运行,每台机器有4个GPU(16个GPU):
这些示例适用于Open MPI。检查MPI文档以获取系统上mpirun命令的参数。
添加-bind-to none和-map-by slot参数非常重要。
使用-x选项,您可以为所有worker指定(-x NCCL_DEBUG = INFO)或复制(-x LD_LIBRARY_PATH)环境变量。
我们已经在Cifar10上使用ResNet_v2_101测试了Horovod,在巴塞罗那超级计算中心(BSC-CNS)的CTE IBM Power9超级计算机上最多16个节点和4个GPU 。
4.1超级计算机:系统概述
CTE-POWER是基于IBM Power9处理器的集群,具有Linux操作系统和Infiniband互连网络。它有54个计算节点,每个节点:
超级计算机Marenostrum - 巴塞罗那超级计算中心的POWER-CTE集群
4.2准备我们的帐户
为了简化并行实验,由于超级计算机没有互联网访问,我们需要准备我们的帐户(假设帐户是“ sam14001”),可以访问预加载的数据集和深度学习模型:
数据集:CIFAR10
我们预装了CIFAR-10数据集(http://www.cs.toronto.edu/~kriz/cifar.html)。CIFAR-10是用于对象识别的已建立的计算机视觉数据集。它是8000万个图像数据集的一个子集,由60,000个32x32彩色图像组成,包含10个对象类别中的一个,每个类别有6000个图像。它由Alex Krizhevsky,Vinod Nair和Geoffrey Hinton收集。
可用模型
在我们的案例中,我们预装了VGG,Inception和ResNet预训练模型。
Simonyan和Zisserman在2014年的论文“ Very Deep Convolutional Networks for Large Scale Image Recognition”中介绍了VGG网络架构。该网络的特征在于其简单性,仅使用3×3卷积层以增加的深度堆叠在彼此之上。减小卷大小由最大池化处理。两个全连接层,每个具有4,096个节点,然后是softmax分类器。我们预装了两个VGG模型:
“16”和“19”代表网络中的权重层数。
Inception微架构最初是由Szegedy等人在2014年的论文《深入卷积》中提出的。Inception模块的目的是通过计算充当“多级特征提取” 1×1 ,3×3 ,和5×5的卷积(同一网络的模块)-然后这些滤波器的输出沿通道尺寸堆叠,然后送入网络的下一层。这种架构的原始版本称为GoogLeNet,但后续的表现形式简称为INCEPTION vN,其中N是指Google推出的版本号。我们预装了:
ResNet是一种“exotic architecture”,依赖于微架构模块,由He等人在2015年的论文《Deep Residual Learning for Image Recognition》中介绍。我们预装了:
“50”,“101”,“152”和“200”代表网络中的权重层数。
4.3如何在CTE Power9上运行脚本
修改python脚本后,必须创建作业脚本(.sh文件)以提交作业。在我们的例子中,我们假设您将使用SLURM工作负载管理器。
提交作业的方法是sbatch直接使用SLURM 指令。作业是SLURM的执行单元,它由脚本(.sh文件)定义,该脚本包含一组描述作业要求的指令以及要执行的命令。这些指令在作业脚本中显示为注释。我们将在本指南中使用的作业脚本如下所示:
在我们的示例中,必须为每个节点请求相同的gpu (- gres gpu:)和任务。MPI进程的数量等于节点和每个节点的GPU的乘积,因为每个进程将分配一个GPU。
这可能是我们的作业脚本,运行4个进程与mpirunin一个节点,4个gpu和一个小时的时间限制:
观察:
gres gpu是每个节点的GPU。tasks-per-node和gres gpu必须相同。
该mpirun的标志-np必须是nodes*tasks-per-node。
Python的标志:
这些是使用SLURM提交和监视作业的基本指令:
sbatch <job_script>
将作业脚本提交到队列系统。
squeue
显示所有提交的作业及其ID。
scancel <job_id>
从队列系统中删除作业,取消进程的执行(如果它们仍在运行)。
另一个重要的事情是必须加载所需的模块和虚拟环境。我们预先建立了一个名为的文件init_gpu_p9.sh,我们可以使用以下命令运行它:
source /gpfs/projects/sam14/mt_p9/init_gpu_p9.sh
在BSC 的CTE IBM Power9集群的用户指南中,您可以找到更详细的信息。
4.4代码:Estimator
我们将使用的估算器是在执行4.2节中描述的上一步骤之后预加载的。
import os import sys import time import json import tensorflow as tf import horovod.tensorflow as hvd import args_parser from datasets import cifar10 from nets import nets_factory from utils import hooks class Model(object): """Class that defines a graph for image classification.""" def __init__(self, params, training): self.network_fn = nets_factory.get_network_fn( params.model_name, num_classes=params.num_classes, is_training=training) def __call__(self, inputs): logits, end_points = self.network_fn(inputs) return logits, end_points def model_fn(features, labels, mode, params): """The model_fn argument for creating an Estimator.""" global_step = tf.train.get_global_step() model = Model(params, training=(mode == tf.estimator.ModeKeys.TRAIN)) images = features tf.summary.image('images', images, max_outputs=3) logits, end_points = model(images) predictions = tf.argmax(logits, axis=1) #probabilities = tf.nn.softmax(logits) with tf.name_scope('loss'): cross_entropy = tf.losses.sparse_softmax_cross_entropy( labels=labels, logits=logits, scope='xent_loss') tf.summary.scalar('xent_loss', cross_entropy) with tf.name_scope('accuracy'): accuracy = tf.metrics.accuracy( labels=labels, predictions=predictions, name='acc') tf.summary.scalar('accuracy', accuracy[1]) if mode == tf.estimator.ModeKeys.EVAL: return tf.estimator.EstimatorSpec( mode=mode, loss=cross_entropy, eval_metric_ops={'accuracy/accuracy': accuracy}) with tf.name_scope('train_op'): # Horovod: add Horovod Distributed Optimizer. # Note: Allgather allocates an output tensor which is proportionate to # the number of processes participating in the training. If you find # yourself running out of GPU memory, you can force allreduce to happen # on CPU by passing `device_sparse='/cpu:0'`. # optimizer = hvd.DistributedOptimizer(optimizer, device_dense='/cpu:0') optimizer = tf.train.GradientDescentOptimizer(learning_rate=params.learning_rate) optimizer = hvd.DistributedOptimizer(optimizer) train_op = optimizer.minimize(cross_entropy,global_step=global_step) train_hook_list = [] train_tensors_log = {'accuracy': accuracy[1], 'loss': cross_entropy, 'global_step': global_step} train_hook_list.append(tf.train.LoggingTensorHook( tensors=train_tensors_log, every_n_iter=params.log_every_n_steps)) # Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states from # rank 0 to all other processes. This is necessary to ensure consistent # initialization of all workers when training is started with random weights or # restored from a checkpoint. train_hook_list.append(hvd.BroadcastGlobalVariablesHook(0)) # Hook to print examples per second. Hook defines in utils->hooks train_hook_list.append(hooks.ExamplesPerSecondHook( batch_size=params.batch_size, warm_steps=10, metric_logger=None, every_n_steps=params.log_every_n_steps)) if mode == tf.estimator.ModeKeys.TRAIN: return tf.estimator.EstimatorSpec( mode=mode, loss=cross_entropy, train_op=train_op, training_hooks=train_hook_list) def main(_): hvd.init() # Load params passed by Flags hparams = tf.contrib.training.HParams() for key, val in vars(FLAGS).items(): hparams.add_hparam(key, val) # Horovod: pin GPU to be used to process local rank (one GPU per process) config = tf.ConfigProto() config.gpu_options.allow_growth = True config.gpu_options.visible_device_list = str(hvd.local_rank()) # Load and set train_image_size to resize image network_fn = nets_factory.get_network_fn( hparams.model_name, num_classes=hparams.num_classes) hparams.set_hparam( 'train_image_size', hparams.train_image_size or network_fn.default_image_size) del network_fn hparams.set_hparam( 'preprocessing_name', hparams.preprocessing_name or hparams.model_name) # Horovod: save checkpoints only on worker 0 to prevent other workers from # corrupting them. model_dir = hparams.model_dir if hvd.rank() == 0 else None image_classifier = tf.estimator.Estimator( model_fn=model_fn, model_dir=model_dir, config=tf.estimator.RunConfig(session_config=config), params=hparams) # Train and eval input functions # train_input_fn = (lambda: cifar10.get_ds( # hparams, 'train', mode=tf.estimator.ModeKeys.TRAIN, file_pattern=_FILE_PATTERN)) train_input_fn = (lambda: cifar10.get_ds( hparams, 'train', mode=tf.estimator.ModeKeys.TRAIN)) eval_input_fn = (lambda: cifar10.get_ds( hparams, 'test', mode=tf.estimator.ModeKeys.EVAL)) # Divide max_number_steps by number of gpus image_classifier.train(input_fn=train_input_fn, max_steps=hparams.max_number_of_steps // hvd.size()) if hvd.rank() == 0: image_classifier.evaluate(input_fn=eval_input_fn) if __name__ == '__main__': tf.logging.set_verbosity(tf.logging.INFO) parser = args_parser.FuncArgParser() FLAGS, unparsed = parser.parse_known_args() tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
资料来源:https: //gist.github.com/jorditorresBCN/92c0b7739c426e4e525c4a3bcc6c105c
4.5案例研究
下图显示了使用最多64个GPU的ResNet 101获得的结果。凭借64个GPU,我们实现了90%的扩展效率。
我们使用了由(from )获得的程序打印的average_example_per_sec值(在最后一步中):hookutils
INFO:tensorflow:accuracy = 0.099609375, loss = 2.3029768, global_step = 700 (32.501 sec) INFO:tensorflow:Benchmark metric: {‘name’: ‘average_examples_per_sec’, ‘value’: 196.5344715545388, ‘unit’: None, ‘global_step’: 711, ‘timestamp’: ‘2018–11–29T17:49:16.269507Z’, ‘extras’: []} INFO:tensorflow:Benchmark metric: {‘name’: ‘current_examples_per_sec’, ‘value’: 196.83145415775098, ‘unit’: None, ‘global_step’: 711, ‘timestamp’: ‘2018–11–29T17:49:16.269683Z’, ‘extras’: []}
提示:请务必更改model_dir参数以防止使用先前执行的检查点。
在这篇文章中,我们介绍了如何使用TensorFlow Estimators和Horovod在多个GPU上分发单个深度神经网络的训练。