bluestartlxp 2019-11-27
当执行的查询数量很大时,数据存储库通常是高要求系统的瓶颈。延迟批处理执行器(DelayedBatchExecutor)是一个组件,可通过在Java多线程应用程序中对所需查询进行批处理来减少所需查询的数量。
n个查询1个参数与1个查询n个参数
让我们假设一个Java应用程序执行对关系数据库的查询,以在给定其唯一标识符(id)的情况下检索Product实体(行)。
查询看起来像这样:
现在,要检索n种产品,可以通过两种方法进行:
使用IN运算符或OR的组合,对n个参数执行一次查询以同时检索n个产品
后者在网络流量和数据库服务器资源(CPU和磁盘)方面更为有效,因为:
这不仅适用于SELECT操作,而且适用于其他操作,例如INSERT,UPDATE和DELETE,实际上,JDBC API包括这些操作的批处理操作。
这同样适用于NoSQL存储库,其中大多数都显式提供BULK操作。
延迟批处理执行器
需要从数据库检索数据的Java应用程序(如REST微服务或异步消息处理器)通常实现为多线程应用程序(* 1),其中:
在这种情况下,数据库很可能在很短的时间间隔内多次执行相同的查询。
如前所述,如果将这1个参数的n个查询替换为具有n个参数的单个等效查询,则应用程序将使用较少的数据库服务器和网络资源。
好消息是,可以通过以下涉及时间窗口的机制来实现它 :
第一个尝试执行查询的线程将打开一个时间窗口,因此其参数存储在列表中,并且该线程已暂停。在时间窗口内执行相同查询的其余线程会将其参数添加到列表中,并且也会被暂停。此时,尚未在数据库上执行任何查询。
时间窗口结束或列表已满(预先定义了最大容量限制)后,便会使用列表中存储的所有参数执行单个查询。最后,一旦数据库提供了该查询的结果,每个线程将接收其相应的结果,并且所有线程将自动恢复。
我为自己(延迟批处理执行器)构建了此机制的简单轻便的实现,可以轻松在新的或现有的应用程序中使用。它基于 Reactor库,并且使用带有通量的Flux缓冲发布者作为参数列表。
使用延迟批处理执行器的吞吐量和延迟分析
让我们假设一个针对产品的REST微服务,它公开了一个端点,用于从给定的数据库中检索产品数据 productId。如果不使用 延迟批处理执行器,则说到端点每秒有200次命中,则数据库每秒执行200个查询。如果端点使用的 时间窗口延迟批处理执行器配置为50毫秒,最大容量 = 10个参数,则数据库每秒仅执行20个查询,每个参数10个参数,但代价是最多在50毫秒内增加延迟(* 2)对于每个线程执行。
换句话说,为了将等待时间增加50 ms(* 2),在保持系统整体吞吐量的同时,数据库每秒收到的查询减少了10倍。
其他有趣的配置:
延迟批处理执行器在行动
深入研究Product微服务示例,假设对于每个传入的HTTP请求,微服务的控制器都要求我们检索提供其ID的Product(Java Bean),因此它将调用该方法:
public Product getProductById(Integer productId) DAO组件的ProductDAO。
让我们看看不带和带的DAO的实现 延迟批处理执行器。
没有延迟批处理执行器
使用延迟批处理执行器
首先,延迟批处理执行器必须在DAO中创建的实例,在本例中为delayedBatchExecutorProductById。它需要以下三个参数:
注意:我们将在后面看到为什么延迟批处理执行器的标识(delayedBatchExecutor ProductById)是该类的实例DelayedBatchExecutor2<Product, Integer>
其次,DAO方法public Product getProductById(Integer productId)已经过重构,可以简单地调用实例的execute方法,仅此delayedBatchExecutor ProductById而已。所有的“魔术”都是由DelayedBatchExecutor。
之所以delayedBatchExecutor ProductById是的实例,DelayedBatchExecutor2<Product, Integer>是因为其execute方法返回一个Product实例并接收一个Integer实例作为其参数。因此,我们有: DelayedBatchExecutor2<Product, Integer>.
如果execute方法需要接收两个参数(例如an Integer和a String)并返回的实例Product,则定义为DelayedBatchExecutor3<Product, Integer,String> ,依此类推。
最后,该 retrieveProductsByIds方法必须返回a List<Product>并接收a List<Integer>作为参数。
如果我们使用DelayedBatchExecutor3<Product, Integer,String>,则retrieveProductsByIds必须是List<Product> retrieveProductsByIds(List<Integer> productIdsList, List<String> stringList)
就是这样。
一旦运行,执行控制器逻辑的并发线程将getProductById(Integer id)在某个时候调用该方法,并且该方法将返回相应的乘积。他们不会知道他们实际上可能已经被暂停并恢复了延迟批处理执行器.
超越数据仓库
尽管本文与数据存储库有关, 延迟批处理执行器 但是可以在其他上下文中使用,例如,在对REST微服务的请求中。同样,用一个参数启动n个GET请求要比使用n个参数启动1个GET要昂贵得多。
延迟批处理执行器的改进
我创建 延迟批处理执行器并使用了一段时间,以有效地处理由个人项目中的并发线程启动的多个查询的执行。我相信它对其他人也可能有用,所以我决定将其公开。
话虽如此,仍有很大的改进空间并可以扩展所提供的功能 延迟批处理执行器。最有趣的是能够根据执行的特定条件动态更改参数 延迟批处理执行器(窗口时间和最大容量),以最大程度地减少等待时间,同时利用具有n个参数的查询。