wearen 2019-09-30
介绍如何使用Celery运行并行任务,以及我们如何和为什么在Celery的Canvas任务基本数据类型上构建一个API。
Zymergen的技术目标之一是使生物学家能够以高通量和高度自动化的方式探索微生物的基因编辑。Zymergen的计算生物学团队负责构建软件来帮助科学家设计和执行这些基因编辑。(有关简要概述,请参阅Zymergen 101教程)。
我们团队的工作流是用Python编写的,并使用Celery异步运行。为了提高这些工作流的性能,我们开发了一个简单的API,用于使用Celery编写并行应用程序代码。在这篇文章中,我们将首先介绍如何使用Canvas构建一个简单的并行工作流,Canvas是Celery附带的一个任务工作流基本数据类型模块。然后,我们将逐步了解如何抽象出Canvas的细节,为我们的用例构建一个简化的并行任务API。
计算生物学团队开发了几个工作流,用于在Python中设计和执行大量的基因编辑。这些工作流通过使用Celery构建的内部任务运行服务ZWork执行。其中一个工作流称为Campaign Design,涉及到读取和写入存储在我们数据库中的潜在地大量微生物基因组数据,以及在单个微生物基因组序列和一个建议的编辑序列之间执行序列比对。单个基因编辑的设计需要1到4分钟,而一个单个Campaign Design包含500-700个单独的基因编辑,这将导致潜在的运行时间(以天为单位)。
这些很长的运行时间对Zymergen科学家来说是一个瓶颈。为了提高效率,计算生物学团队首先研究了序列化过程本身,并确定了一些改进的地方。然而,由于每个设计都是完全独立的,并且其目标是同时设计许多更改,所以我们认为并行化将是性能的主要优势。此外,并行化也可以应用于其他工作流:许多其他计算生物学工作流都是计算密集型的,但大多数都是“令人尴尬的并行”。事实上,这些工作流中的大多数都可以用一个简单的fork-join模式来处理,在这个模式中,一组输入被分配给独立的worker进行计算。
并行化每个工作流有两个主要步骤(Celery中称为“任务”)。第一个步骤是重构任务代码本身,以便在并行运行的工作周围有清晰的边界。第二个步骤是开发基础设施来执行这个重构的代码。ZWork的Celery基础设施在这一点上是支持良好并经受过考验的。如果可能的话,开发者会希望避免跳到一个新技术上去。然而,目前我们还不清楚Celery将如何处理并行化的过程的负载。一个Hack Week项目中的比较乐观的结果表明,Celery可以胜任。
考虑到迁移到并行化的工作流的主体工作是在重构代码,而且Celery长期运行的效果怎么样还有不确定性,所以我们决定在Celery之上创建一个简化的API,让我们的软件工程师能专注于他们工作流的逻辑上,并开发并行化的工作流,而不需要了解Celery的内部细节。这还允许我们在未来更改后端,同时保持应用程序逻辑的完整性。最后,它将提供一个向该中心任务API添加一些小功能的机会,比如标准化的输入验证。
直接跳到结果,应用了并行化之后,我们现在看到每个设计的总运行时间为5到15秒,而不是1到4分钟。通过我们的API利用Canvas,我们可以实现更大程度的并行化,并使用我们现有的基础设施为这个问题带来更多的计算能力。
下面的部分我们将介绍Celery的设置,以及如何使用Canvas先构建一个序列,然后再构建一个并行任务。要将一个现有的Celery任务转换为一组Canvas基本数据类型去形成一个“fork-join”任务,以便在多个Celery worker之间分配计算,我们只需要这个方法。在浏览过这些之后,我们指出了使用Celery处理大型数据集的内存使用限制。如果您熟悉用于并行任务的Canvas,您可以直接跳到我们设计的API抽象。
我们的大部分技术堆栈是建立在开源Python生物信息学和web工具(BioPython、Numpy、Django、Celery、Airflow等等)之上的。我们使用Celery为这些任务创建一个灵活的任务运行器(ZWork)。在Zymergen的早期,作为一个需要运行异步任务队列的小型启动,Celery是一个自然的选择。随着公司的发展,我们增加了其他处理分布式工作的技术(AWS Lambda、AWS Batch等)。然而,Celery仍然是一个很好的资源。对于我们的运行时间超过Lambda的时间限制的任务,它工作得很好。与Batch相比,它的启动开销较低,这使得我们的Celery任务可以在用户提交时就立即开始。我们有各种各样的任务运行在我们的ZWork集群上。
Celery从3.0版本开始就附带了Canvas,用于设计复杂的任务工作流。Canvas提供了一些基本数据类型(group,、chain、 partial等),用于从现有的Celery任务组成一个工作流。Canvas非常适合计算生物学团队立即进行并行化所需的任务的类型。
下面的代码示例假设您有现存的Celery worker。有关开始使用Celery的详细信息,请参见:http://docs.celeryproject.org/en/latest/getting-started/firststeps-with-celery.html
下面是一个简化的单词计数示例。我们希望传入一个文档列表,并返回一个单词字典,以确定总的使用情况。我们假设每个文档都是一个单词列表,每个单词以一个空格隔开。
我们的目标是创建一个可以按以下方式被调用的Celery签名:
使用Canvas的串行任务
一个标准的串行版本是这样的:
这里创建了一个Python函数,并使用shared_task装饰器进行装饰。这会将该函数注册为一个Celery签名。
使用Canvas的并行任务
它的并行版本是这样的:
并行代码具有与串行代码相同的基本功能。其额外的开销提供了一种方法,可以将单词计数步骤分配给多个不同的worker,使用NUMBER_OF_GROUPS常量指定。setup_step用于为独立计数准备文档。process_step对一个文档子集进行计数,然后join_step对计数进行结合。我们按顺序来看看这些函数。
setup_step
此设置会接受输入列表并将其转换为NUMBER_OF_GROUPS列表(包括可能的空列表)的一个嵌套列表。
首先跳出来的可能是NUMBER_OF_GROUPS常量。一个Celery组必须在任务开始时静态声明——这里不可能动态创建条目的数量。这意味着你必须预先决定把工作分成多少块。
我们使用的值是专用的并行Celery worker数量的两倍。如果输入比该值少,向该过程步骤发送一个no-op值(空列表)就可以很好地工作。你浪费了一个过程,但它的时间很短。对于大型任务来说,这个值可能会导致不规律的完成时间和worker池的非最佳使用,但对我们来说,这是一个合理的默认值。当我们着眼于Celery worker的动态变化时,这个值可能会改变。
process_step
除了添加一个索引之外,此过程步骤中的逻辑看起来与串行版本相同。
在查看process_step函数的逻辑之前,让我们先看看group签名的创建过程:
我们传递给process_step函数的group_index对于确保每个worker只操作总数据的一部分非常重要。其原因和潜在的负面影响下面有讨论。
process_step然后对整个数据的一个切片执行一个操作。
join_step
group基本数据类型会收集正在运行的process_step的每个实例的所有结果,并将这些结果组合成一个列表,然后该列表被传递给链中的最后一个入口,也就是join_step任务。
join_step的第一行是重新创建Counter对象。作为将数据分发给任务的一部分,当Celery在执行JSON序列化/反序列化时,在处理步骤中创建的Counter对象将被转换为一个常规的Python字典。这是一个重要的提醒,您的任务的每个输入和输出都必须是可序列化的。如果你选择使用JSON进行此序列化过程,那就需要考虑往返过程中可能发生的任何转变。其他序列化方法也是可用的,包括pickle。
最终我们需要的Celery基本数据类型是chain。这将把setup_step、process_steps组和join_step链接到一个可调用的对象中:
像这样来调用它:
在开发我们的第一个并行任务期间,我们了解到Celery向一个组发送数据的方式可能会有潜在地大量内存使用。组中的每个任务接收相同的输入数据——数据列表的整个列表——然后处理其特定的片。为了有一个单个的处理步骤来处理数据子集,我们使用NUMBER_OF_GROUPS常量引入了索引值。我们还创建了一个具有group_index值集合的偏函数。这个偏函数被注册为一个Celery签名,实质上是将其放入一个任务队列中,等待来自setup_step的剩余输入。
为了重申这一点,需要由单个worker处理的全部数据与数据块的大小成正比。
下图是一个不同的示例任务(对一组数字求和),说明了数据复制是如何发生的:
Celery向组发送数据的方式可能会潜在地占用大量内存。这个对一个数字列表求和的示例说明了整个输入数据是如何进行复制而供每个worker使用的。
这可能是非常低效的。如果你在处理大型数据的话,不要这样做!Celery可能行不通。如果您有需要长时间进行计算的合理大小的数据(或使process_step的输入变成合理大小的方法),那么它仍然可以运行——这正是我们的用例。对于我们的许多任务来说,来自用户的输入是对象id。我们的处理步骤将这些id传递给每个单独的并行worker,然后该worker再从数据库中获取它所需的实际对象。
但是,我们有一个并行任务,它在设置步骤中就将一个大型嵌套数据结构分解为Python字典。此外,最近Zymergen科学家使用这个任务的方式发生了变化,这意味着这个已经很大的数据结构已经扩大了近一个数量级。在使用如此大的一个输入来测试此任务时,我们用于排队的RabbitMQ就会耗尽内存。我们正在为这个特别的任务考虑其他选项,比如将子对象放置在数据库或其他缓存中。
注意到这些限制之后,我们就可以使用Celery/Canvas很好地完成某些任务。我们看到速度提高了一个数量级:从1到4分钟提高到5到15秒。
在证明我们可以使用Canvas构建所需的任务类型之后,我们想要构建一个简化的API,使新任务开发人员能够专注于其任务的业务逻辑,而不需要考虑底层框架。这种来自Canvas特性的业务逻辑的分离允许我们从使用Celery作为后端来向对我们的API实现进行更改来完成任务的潜在转移,而不需要对现有任务进行任何更改。
我们的API还隐藏了Canvas的一些限制。例如,作为实现业务逻辑的人员,你不再确切地知道输入是如何被切片、分割和分布的。你编写的设置逻辑会将输入分割成离散的逻辑块,其中的参数名称会映射到你的处理步骤的输入上。编写处理步骤是为了对一份单个数据进行操作,从而简化逻辑。这样做既有好处也有代价。如果你不知道设置步骤返回的数据在内存中被复制了几次,你可能就无法进行相应的计划。
除了将任务逻辑与其执行过程分离之外,我们的API还提供了两个附加特性。两者都有助于简化任务输入:
下面是用我们的API重新创建的计数单词的相同示例任务:
你可以按照以下方式来调用它:
该API允许我们为处理和join步骤声明函数参数,这些参数接受直接来自用户输入的输入。这需要一些幕后工作,但它简化了应用程序开发人员的工作。
例如,WordCounterTask允许我们声明一个文档分隔符。所以代替这个:
你会获得这个结果:
你可以在这里找到完整的代码。地址:https://gist.github.com/battmatt/3bd206fba7cd1aa13f37a9c7b88a23a3
Zymergen的软件工程师们开发了一些工具,使科学家们能够利用传统实验室环境中不可能的手段来驾驭生物学。在这样做的时候,我们努力在满足今天的需要和未来的预期需要之间取得平衡。我们基于Celery的微生物设计工作流就是这一理念的一个很好的例子。这项任务的串行版本很慢,但是对于当时的用例来说是可以接受的。当我们的科学家登上一个基因组明显更大的微生物时,执行时间几乎翻了两番。考虑到一个完整的设计要包含500-700个单独的基因编辑,为一个任务完成等待几天显然不是很好。
使用Celery/Canvas作为此任务后端的决定是非常实用的:Celery是已经应用了的基础设施。我们本来还可以使用其他技术(AWS Batch或Lambda都被用于其他内部项目)。然而,我们没有放弃现有的Celery基础设施,而是直接在Celery顶层开发了一个并行解决方案,并且可以成功运行!使用32个Celery worker运行并行化任务时,大型微生物的计算时间为27秒,而平均值为235秒。通过使用32倍的核心,速度提升达到10倍。增加的序列化/反序列化和数据库调用会带来很大的开销,但是总的来说,终端用户已经很好地接受了这一巨大的改进,并且对于这种规模的问题,使用核心解决该问题的开销是可以接受的。
在现有的Celery基础设施之上创建一个简单的、非Celery专用的API,允许我们添加一些小的增量改进,比如,增加了在不调用Celery的情况下支持端到端测试,并将其直接应用于所有任务。对我们的API进行标准化还确保了每个任务都是以一致的模式进行编写。未来工作量的需求可能会有所不同;即将出现的问题可能无法用Celery来处理,并且可能需要对基础设施进行更改。我们提到的Celery相关的内存使用问题可能是强制执行函数,或者我们可能首先会遇到一些其他无法预见的问题。理想情况下,我们的API——我们设计它是为了让我们能够保留现有的基础设施,同时提供一个在未来不需要重写用户代码的迁移路径——将会带来好处。
Matthew Batterton是Zymergen计算生物学团队的软件工程经理。
英文原文:https://medium.com/@ZymergenTechBlog/building-a-parallel-task-api-with-celery-dbae5ced4e28
译者:好酒不上头