AI产品经理Pipi 2019-03-11
随着互联网金融业务和相关技术的不断发展,传统金融行业为满足业务快速发展需求,正在积极引入各类开源技术,以快速抢占市场。那么,以金融和科技作为双驱动的平安银行在开源技术的引入方面是如何评估,运用到哪些业务场景,以及面对复杂的网络环境,是如何去部署的呢?
本文将以 Apache RocketMQ 为例,和您一起了解平安银行在开源技术选型方面的思考和实践。
复杂网络环境下的部署实践;
多隔离区场景下的部署情况;
多 IDC 场景下的部署情况;
改造实践和遇到的小插曲;
RocketMQ 在平安银行的应用场景
目前,平安银行通过 RocketMQ 解决了数据预加、信息通知和状态变化方面的业务需求,接下来,我们通过 App 登录、资产总览和工资理财 3 个应用场景来展开讲下。
App 登录:
当用户打开平安银行 App 的时候,系统会根据用户的登录 ID 去加载相应的用户数据,比如银行卡、信用卡和理财产品等,以及一些系统通知。这个场景下,我们用到了 RocketMQ 的异步处理能力,即预加载需要使用的数据,提升用户体验。
资产总览:
进入平安银行 App 资产总览的页面,页面显示账户余额、各类理财产品(黄金、基金和股票等),以及贷款等方面的信息。平安银行对接了各类基金、黄金和股票等来自不同金融主体、不同系统的数据,具有种类多、异构系统多和变化快的特点。我们的目标就是让资产总览数据尽可能准确,不同种类的资产变动的时候发出通知,资产系统收到通知后,根据变化的数据来计算出用户当前的资产总览。
工资理财:
工资理财是指每月工资下发到银行卡后,系统可以实现自动买入用户设置好的理财产品,例如买一些定投类的理财产品。这里信息的传递流程是:
那么,在这些场景中,我们对消息引擎会有哪些要求呢?
A、高可用、高可靠和高性能,这是金融行业引入开源技术的基本要求;
B、堆积能力,代发工资的用户很多,一个公司的员工会在某个时间点集中发放;
C、顺序能力,即账户变动在先,发出通知在后;
D、事务性能力,如果没有事务性,有可能会出现账户变动了,但没有触发购买理财产品的情况;
E、重试和延迟消息功能,比如工资发放的时候,可能是在晚上,这时候系统会自动把购买理财的动作放在第二天白天去操作,再发出通知;
F、消息回溯能力,就是出现问题的时候,我们可以把这个消息进行回溯,重新进行消息的处理,提供完整的消息轨迹;
在技术选型的过程中,RocketMQ 符合我们在这些典型使用场景中对消息产品的需求,在引入的过程中,平安银行也对社区做了相应的贡献。
多测试子环境下的服务调用场景
平安银行有多套测试子环境,用来对不同的feature进行测试,即图中的 FAT、FAT001、FAT002、FAT003等。传统银行系统由大型机时代向更面向互联网用户的零售时代转型过程中,不可避免微服务化,传统较集中式系统会被划分为较多的微服务,正常的情况如下图,服务 A 调用服务 B,服务 B 调用服务 C,服务 C 调用服务 D。
随着业务的需求,新的 feature,我们需要对服务 A 和 B 进行改动。相比在FAT环境里去部署测试,更合适的方式是另起一套 FAT 环境,这里我们命名为 FAT001,把服务A和B部署上去,A 调用 B,B会调用原来 FAT 环境里的 C 和 D。
此时,另一个新的需求,需要对服务 A 和 C 进行改动。如果直接发布到FAT 或 FAT001 肯定是不对的,会影响正在进行的测试,此时,我们会再起一套测试环境,命名为FAT002,发布服务 A 和 C。由于 FAT002 里没有服务 B,所以服务A要调用服务 B 就需要去 FAT 环境(FAT 定义为较稳定的测试子环境)。服务 B 调用服务 C 的时候,就不应该调用本环境的 C了,而是调动 FAT002 的 C 才能实现测试功能。
再假设,系统同时还会有另外一个 feature 在测试 C 和 D,此时的调用逻辑也是一样的,系统调用服务 A 和 B 的时候是在 FAT,调用服务 C 和 D 的时候会到 FAT003 的环境。
以上的服务调用场景是可以通过微服务框架解决的,进行全链路测试,但在生产环境中,用户的真实请求比测试环境中会更复杂一些。
真实的用户请求过程
我们看一下真实的用户请求。
APP发起一个请求请求,进入网关,需要知道请求哪一个测试环境。通常的做法是:测试人员需要在APP上选好子环境,例如选择 FAT001,我们可以直接把请求 FAT001 的网关(每个环境网关单独部署),或者在requestheader上添加标识,让网关去区分它来源于哪一个环境(网关统一部署)。假如网关判断请求来源于 FAT002,那就会把分发给 FAT002环境进行处理。
以上是服务调用的请求路径,比较好理解,但到了消息队列上,问题会变得复杂些,假如一个 feature 只是更改了消费者,那如何把这条消息传递到改的消费者应用上去进行测试,而不被其它环境的消费者消费掉,这是我们需要去考虑的问题。
来看下具体的情况,集群部署了 Broke A 和 Broke B,TopicA 分别部署在这两个Broker上。 此时,Producer Group A 向 Topic A 中写数据,Consumer Group A去消费,这种情况下是不会有任何问题。
但如果新增一套 FAT001 的环境,要求 FAT001 发布的消息,只能由 FAT001 来消费,FAT 不能消费,这种情况下我们该怎么解决?
这些都不能解决我们的问题。️
一方面成本太高,另一方面如果这个feture测试完成了,需要把相关的
应用再切回 FAT 进行处理,实现太过复杂。️
我们想一下,多个 feature 同时进行测试,DB 会部署一套还是多套?
首先一个 feature 不会更改所在的应用,一般来说 DB 是部署一套的,在数据库里面添加字段,来识别数据是来源于哪一个子环境,如果多套部署,不更改的应用取不到新部署的 DB 数据,无法进行全链路测试,所以同样的,我们也没有在每个子环境都部署一套 RocketMQ,而是部署统一部署,通过 RPC 路由把请求路由到正确的生产者集,改造消息路由算法把消息路由到正确的消费者进行处理。
真实的用户请求过程
生产者变更
在上图中生产者变更的场景下,默认的场景 FAT发送,FAT 消费 ,没有问题的,假设 FAT001 的生产者发布了,需要 FAT001 发送到MQ集群,FAT 是可以直接消费。
生产者、消费者同时变更
在上图生产者和消费者同时变更的场景下,如果消费者在 FAT001也部署了应用,需要FAT消费者不能消费由FAT001产生的消息,而是由 FAT001的消费者去消费。我们的处理方式是在逻辑上把Topic A下面的Queue进行分组,相当于加了逻辑分组,如果消费者在 FAT001 有部署,我们会把 Queue 的分组扩大,在其默认设置的情况下再翻一倍,新增的 Queue 就是给到 FAT001 进行消费的。
只有消费者变更
再来看看只有消费者变更的场景,如上图。
假设有个feature只需要更改消费者,部署在 FAT001。也是可以通过逻辑分组的方式,实现生产者根据请求来源来发送消息到队列 FAT001 逻辑分组内的 Queue,从而只让部署在 FAT001 的消费者消费。
通过以上 3 个场景,我们了解到添加逻辑分组的必要性,实现过程并不复杂。主要做了以下调整:️
我们选择了动态创建的方式,这个过程中,我们添加了 Meta Server 进行元数据管理,进行动态创建:
由于对安全上的重视,金融行业的网络环境相比其他行业会更复杂。整个隔离区分为以下几个部分:
此外,从安全的角度出发,所有的区域在生产环境下都通过防火墙进行隔离,这就给我们部署 RocketMQ 带来了很大的实施难度。如果只部署一套,面对这么多的防火墙,生产者消费者到集群的的流量跨墙,会给网络带来极大的不稳定,遇到瓶颈,防火墙几乎无法在线平滑扩容;如果每个子环境都部署一套,又带来运维复杂度,而且还是存在数据同步和跨墙消费的问题。
最终,我们采用的是折中的办法,即统一部署加分隔离区部署,这样做的益处是:
同城多IDC,可以认为近似局域网,比较好处理,但异地多IDC多活场景,目前我们还没有特别好的解方案,多活不可避免面临数据分片、数据合并和数据冲突的解决等问题。
如果 Topic 下数据有多活需求,我们暂时通过复制方式来处理。但这类手工模拟消费者消费数据写入新集群的复制方式,会存在一些问题,即复制到另一个集群之后 offset 会改变,处理逻辑就会有偏差。我们通过 pull 的方式自定义的去根据 offset 去进行消费。当故障转移到新的集群需要去消费的时候,需要获取到新集群里面正确的offset 值。此时,这个值和之前集群里的已经不一样了,需要根据时间得到新集群里正确的offset 值,再去进行消费。在没有更好的解决方案前,治理就显得很关键了。
不过,我们注意到,在 RocketMQ 最新发布的版本里,提供了 DLedger 的特性,DLedger 定位的是一个工业级的 Java Library,可以友好地嵌入各类 Java 系统中,满足其高可用、高可靠、强一致的需求。我们会尽快对这一特性进行集成和测试。
我们在对 RocketMQ 的使用过程中,添加了以下功能或特性:
A. 为生产者提供消息的堆积能力。
B. 将所有配置管理部署到配置中心,并做云端化处理,以满足动态修改的需求。
C. 在 4.3 版本还未提供事务处理能力前,我们在本地数据库里去建一张消息表,数据库更改数据状态的时候,会同步将数据写入消息表。若发送失败,则进行补偿。并在客户端里进行封装。
D. 实现统一的消息者幂等处理。
E. 添加身份认证和消息认证(注:RocketMQ 4.3 版本中已经实现身份认证功能)
当然,也遇到了一些小插曲,基本都是使用上的小问题,可能大家也会碰到:
A. 一个应用使用多个RocketMQ集群时,未加载到正确的配置。在Client 端,如果没有对 instancename 进行配置,一个应用连多个集群会失败。
B. 在大数据的场景下,内存溢出。订阅的 Topic 越多,每个 Queue 在本地缓存的 message 也会越多,默认每个 Queue 1000条,可能会把内存打爆,可根据实际情况调整。
C. 删除数据时 IO 抖动,默认每天凌晨4点删除数据,量上来后出现 IO 抖动,配置了消息删除策略,默认逗号分隔开,多配几个时间删除就可以了。
D. Broker上日志报延迟消息找不到数据文件。在主备切换的演练过程中,出现了延迟消息在 Broker 上处理异常的情况。当主切换到备端时,延迟消息在 Broker 上保存的文件被自动删除,再切回主,由于延时消息的元数据感觉在,会查询文件进行处理,此时已找不到文件。
E. 挂 NAS 的时候,IP 获取了 NAS 的私网地址,并被提交给了服务端。
以上就是我们在部署过程中遇到的一些小插曲,基本都是可以很快定位原因,解决的。
总的来看,RocketMQ 对平安银行的消息系统建设的帮助是非常大的,尤其是满足了数据持久化、顺序消费和回溯的需求,此外,在消息的查询方面,也比我们之前使用的消息引擎好很多。最后再分享一点自己对中间件的一点感悟:中间件使用重在治理,规范不先行,开发两行泪。
本文作者:吴建峰,GitHub ID @devilfeng,来自平安银行平台架构部基础框架团队。
作者:中间件小哥