lomenw 2019-06-30
Dubbo 压测插件已开源,本文涉及代码详见gatling-dubbo
Gatling 是一个开源的基于 Scala、Akka、Netty 实现的高性能压测框架,较之其他基于线程实现的压测框架,Gatling 基于 AKKA Actor 模型实现,请求由事件驱动,在系统资源消耗上低于其他压测框架(如内存、连接池等),使得单台施压机可以模拟更多的用户。此外,Gatling 提供了一套简单高效的 DSL(领域特定语言)方便我们编排业务场景,同时也具备流量控制、压力控制的能力并提供了良好的压测报告,所以有赞选择在 Gatling 基础上扩展分布式能力,开发了自己的全链路压测引擎 MAXIM。全链路压测中我们主要模拟用户实际使用场景,使用 HTTP 接口作为压测入口,但有赞目前后端服务中 Dubbo 应用比重越来越高,如果可以知道 Dubbo 应用单机水位将对我们把控系统后端服务能力大有裨益。基于 Gatling 的优势和在有赞的使用基础,我们扩展 Gatling 开发了 gatling-dubbo 压测插件。
实现 Dubbo 压测插件,需实现以下四部分内容:
协议部分,这里主要定义 Dubbo 客户端相关内容,如协议、泛化调用、服务 URL、注册中心等内容,ProtocolBuild 则为 DSL 使用 Protocol 的辅助类
执行部分,这里的作用是发起 Dubbo 请求,校验请求结果并记录日志以便后续生成压测报告。ActionBuild 则为 DSL 使用 Action 的辅助类
检查部分,全链路压测中我们都使用Json Path
检查请求结果,这里我们实现了一样的检查逻辑。CheckBuild 则为 DSL 使用 Check 的辅助类
Dubbo 插件的领域特定语言,我们提供了一套简单易用的 API 方便编写 Duboo 压测脚本,风格上与原生 HTTP DSL 保持一致
协议部分由 5 个属性组成,这些属性将在 Action 初始化 Dubbo 客户端时使用,分别是:
协议,设置为dubbo
泛化调用设置,Dubbo 压测插件使用泛化调用发起请求,所以这里设置为true
,有赞优化了泛化调用的性能,为了使用该特性,引入了一个新值result_no_change
(去掉优化前泛化调用的序列化开销以提升性能)
Dubbo 服务的地址:dubbo://IP地址:端口
Dubbo 注册中心的协议,设置为ETCD3
Dubbo 注册中心的地址
如果是测试 Dubbo 单机水位,则设置 url,注册中心设置为空;如果是测试 Dubbo 集群水位,则设置注册中心(目前支持 ETCD3),url 设置为空。由于目前注册中心只支持 ETCD3,插件在 Dubbo 集群上使用缺乏灵活性,所以我们又实现了客户端层面的负载均衡,如此便可抛开特定的注册中心来测试 Dubbo 集群水位。该特性目前正在内测中。
object DubboProtocol { val DubboProtocolKey = new ProtocolKey { type Protocol = DubboProtocol type Components = DubboComponents def protocolClass: Class[io.gatling.core.protocol.Protocol] = classOf[DubboProtocol].asInstanceOf[Class[io.gatling.core.protocol.Protocol]] def defaultProtocolValue(configuration: GatlingConfiguration): DubboProtocol = throw new IllegalStateException("Can't provide a default value for DubboProtocol") def newComponents(system: ActorSystem, coreComponents: CoreComponents): DubboProtocol => DubboComponents = { dubboProtocol => DubboComponents(dubboProtocol) } } } case class DubboProtocol( protocol: String, //dubbo generic: String, //泛化调用? url: String, //use url or registryProtocol: String, //use registry registryAddress: String //use registry ) extends Protocol { type Components = DubboComponents }
为了方便 Action 中使用上面这些属性,我们将其装进了 Gatling 的 ProtocolComponents:
case class DubboComponents(dubboProtocol: DubboProtocol) extends ProtocolComponents { def onStart: Option[Session => Session] = None def onExit: Option[Session => Unit] = None }
以上就是关于 Protocol 的定义。为了能在 DSL 中配置上述 Protocol,我们定义了 DubboProtocolBuilder,包含了 5 个方法分别设置 Protocol 的 protocol、generic、url、registryProtocol、registryAddress 5 个属性。
object DubboProtocolBuilderBase { def protocol(protocol: String) = DubboProtocolBuilderGenericStep(protocol) } case class DubboProtocolBuilderGenericStep(protocol: String) { def generic(generic: String) = DubboProtocolBuilderUrlStep(protocol, generic) } case class DubboProtocolBuilderUrlStep(protocol: String, generic: String) { def url(url: String) = DubboProtocolBuilderRegistryProtocolStep(protocol, generic, url) } case class DubboProtocolBuilderRegistryProtocolStep(protocol: String, generic: String, url: String) { def registryProtocol(registryProtocol: String) = DubboProtocolBuilderRegistryAddressStep(protocol, generic, url, registryProtocol) } case class DubboProtocolBuilderRegistryAddressStep(protocol: String, generic: String, url: String, registryProtocol: String) { def registryAddress(registryAddress: String) = DubboProtocolBuilder(protocol, generic, url, registryProtocol, registryAddress) } case class DubboProtocolBuilder(protocol: String, generic: String, url: String, registryProtocol: String, registryAddress: String) { def build = DubboProtocol( protocol = protocol, generic = generic, url = url, registryProtocol = registryProtocol, registryAddress = registryAddress ) }
DubboAction 包含了 Duboo 请求逻辑、请求结果校验逻辑以及压力控制逻辑,需要扩展 ExitableAction 并实现 execute 方法。
DubboAction 类的域 argTypes、argValues 分别是泛化调用请求参数类型和请求参数值,需为 Expression[] 类型,这样当使用数据 Feeder 作为压测脚本参数输入时,可以使用类似 ${args_types}
、${args_values}
这样的表达式从数据 Feeder 中解析对应字段的值。
execute 方法必须以异步方式执行 Dubbo 请求,这样前一个 Dubbo 请求执行后但还未等响应返回时虚拟用户就可以通过 AKKA Message 立即发起下一个请求,如此一个虚拟用户可以在很短的时间内构造大量请求。请求方式方面,相比于泛化调用,原生 API 调用需要客户端载入 Dubbo 服务相应的 API 包,但有时候却拿不到,此外,当被测 Dubbo 应用多了,客户端需要载入多个 API 包,所以出于使用上的便利性,Dubbo 压测插件使用泛化调用发起请求。
异步请求响应后会执行 onComplete 方法,校验请求结果,并根据校验结果记录请求成功或失败日志,压测报告就是使用这些日志统计计算的。
为了控制压测时的 RPS,则需要实现 throttle 逻辑。实践中发现,高并发情况下,泛化调用性能远不如原生 API 调用性能,且响应时间成倍增长(如此不能表征 Dubbo 应用的真正性能),导致 Dubbo 压测插件压力控制不准,解决办法是优化泛化调用性能,使之与原生 API 调用的性能相近,请参考dubbo 泛化调用性能优化。
class DubboAction( interface: String, method: String, argTypes: Expression[Array[String]], argValues: Expression[Array[Object]], genericService: GenericService, checks: List[DubboCheck], coreComponents: CoreComponents, throttled: Boolean, val objectMapper: ObjectMapper, val next: Action ) extends ExitableAction with NameGen { override def statsEngine: StatsEngine = coreComponents.statsEngine override def name: String = genName("dubboRequest") override def execute(session: Session): Unit = recover(session) { argTypes(session) flatMap { argTypesArray => argValues(session) map { argValuesArray => val startTime = System.currentTimeMillis() val f = Future { try { genericService.$invoke(method, argTypes(session).get, argValues(session).get) } finally { } } f.onComplete { case Success(result) => val endTime = System.currentTimeMillis() val resultMap = result.asInstanceOf[JMap[String, Any]] val resultJson = objectMapper.writeValueAsString(resultMap) val (newSession, error) = Check.check(resultJson, session, checks) error match { case None => statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("OK"), None, None) throttle(newSession(session)) case Some(Failure(errorMessage)) => statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("KO"), None, Some(errorMessage)) throttle(newSession(session).markAsFailed) } case FuFailure(e) => val endTime = System.currentTimeMillis() statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("KO"), None, Some(e.getMessage)) throttle(session.markAsFailed) } } } } private def throttle(s: Session): Unit = { if (throttled) { coreComponents.throttler.throttle(s.scenario, () => next ! s) } else { next ! s } } }
DubboActionBuilder 则是获取 Protocol 属性并初始化 Dubbo 客户端:
case class DubboActionBuilder(interface: String, method: String, argTypes: Expression[Array[String]], argValues: Expression[Array[Object]], checks: List[DubboCheck]) extends ActionBuilder { private def components(protocolComponentsRegistry: ProtocolComponentsRegistry): DubboComponents = protocolComponentsRegistry.components(DubboProtocol.DubboProtocolKey) override def build(ctx: ScenarioContext, next: Action): Action = { import ctx._ val protocol = components(protocolComponentsRegistry).dubboProtocol //Dubbo客户端配置 val reference = new ReferenceConfig[GenericService] val application = new ApplicationConfig application.setName("gatling-dubbo") reference.setApplication(application) reference.setProtocol(protocol.protocol) reference.setGeneric(protocol.generic) if (protocol.url == "") { val registry = new RegistryConfig registry.setProtocol(protocol.registryProtocol) registry.setAddress(protocol.registryAddress) reference.setRegistry(registry) } else { reference.setUrl(protocol.url) } reference.setInterface(interface) val cache = ReferenceConfigCache.getCache val genericService = cache.get(reference) val objectMapper: ObjectMapper = new ObjectMapper() new DubboAction(interface, method, argTypes, argValues, genericService, checks, coreComponents, throttled, objectMapper, next) } }
LambdaProcessBuilder 则提供了设置 Dubbo 泛化调用入参的 DSL 以及接下来要介绍的 Check 部分的 DSL
case class DubboProcessBuilder(interface: String, method: String, argTypes: Expression[Array[String]] = _ => Success(Array.empty[String]), argValues: Expression[Array[Object]] = _ => Success(Array.empty[Object]), checks: List[DubboCheck] = Nil) extends DubboCheckSupport { def argTypes(argTypes: Expression[Array[String]]): DubboProcessBuilder = copy(argTypes = argTypes) def argValues(argValues: Expression[Array[Object]]): DubboProcessBuilder = copy(argValues = argValues) def check(dubboChecks: DubboCheck*): DubboProcessBuilder = copy(checks = checks ::: dubboChecks.toList) def build(): ActionBuilder = DubboActionBuilder(interface, method, argTypes, argValues, checks) }
全链路压测中,我们都使用Json Path
校验 HTTP 请求结果,Dubbo 压测插件中,我们也实现了基于Json Path
的校验。实现 Check,必须实现 Gatling check 中的 Extender 和 Preparer:
package object dubbo { type DubboCheck = Check[String] val DubboStringExtender: Extender[DubboCheck, String] = (check: DubboCheck) => check val DubboStringPreparer: Preparer[String, String] = (result: String) => Success(result) }
基于Json Path
的校验逻辑:
trait DubboJsonPathOfType { self: DubboJsonPathCheckBuilder[String] => def ofType[X: JsonFilter](implicit extractorFactory: JsonPathExtractorFactory) = new DubboJsonPathCheckBuilder[X](path, jsonParsers) } object DubboJsonPathCheckBuilder { val CharsParsingThreshold = 200 * 1000 def preparer(jsonParsers: JsonParsers): Preparer[String, Any] = response => { if (response.length() > CharsParsingThreshold || jsonParsers.preferJackson) jsonParsers.safeParseJackson(response) else jsonParsers.safeParseBoon(response) } def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) = new DubboJsonPathCheckBuilder[String](path, jsonParsers) with DubboJsonPathOfType } class DubboJsonPathCheckBuilder[X: JsonFilter]( private[check] val path: Expression[String], private[check] val jsonParsers: JsonParsers )(implicit extractorFactory: JsonPathExtractorFactory) extends DefaultMultipleFindCheckBuilder[DubboCheck, String, Any, X]( DubboStringExtender, DubboJsonPathCheckBuilder.preparer(jsonParsers) ) { import extractorFactory._ def findExtractor(occurrence: Int) = path.map(newSingleExtractor[X](_, occurrence)) def findAllExtractor = path.map(newMultipleExtractor[X]) def countExtractor = path.map(newCountExtractor) }
DubboCheckSupport 则提供了设置 jsonPath 表达式的 DSL
trait DubboCheckSupport { def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) = DubboJsonPathCheckBuilder.jsonPath(path) }
trait AwsDsl
提供顶层 DSL。我们还定义了 dubboProtocolBuilder2DubboProtocol、dubboProcessBuilder2ActionBuilder 两个 Scala 隐式方法,以自动构造 DubboProtocol 和 ActionBuilder。
此外,泛化调用中使用的参数类型为 Java 类型,而我们的压测脚本使用 Scala 编写,所以这里需要做两种语言间的类型转换,所以我们定义了 transformJsonDubboData 方法
trait DubboDsl extends DubboCheckSupport { val Dubbo = DubboProtocolBuilderBase def dubbo(interface: String, method: String) = DubboProcessBuilder(interface, method) implicit def dubboProtocolBuilder2DubboProtocol(builder: DubboProtocolBuilder): DubboProtocol = builder.build implicit def dubboProcessBuilder2ActionBuilder(builder: DubboProcessBuilder): ActionBuilder = builder.build() def transformJsonDubboData(argTypeName: String, argValueName: String, session: Session): Session = { session.set(argTypeName, toArray(session(argTypeName).as[JList[String]])) .set(argValueName, toArray(session(argValueName).as[JList[Any]])) } private def toArray[T:ClassTag](value: JList[T]): Array[T] = { value.asScala.toArray } }
object Predef extends DubboDsl
压测脚本示例:
import io.gatling.core.Predef._ import io.gatling.dubbo.Predef._ import scala.concurrent.duration._ class DubboTest extends Simulation { val dubboConfig = Dubbo .protocol("dubbo") .generic("true") //直连某台Dubbo机器,只单独压测一台机器的水位 .url("dubbo://IP地址:端口") //或设置注册中心,压测该Dubbo应用集群的水位,支持ETCD3注册中心 .registryProtocol("") .registryAddress("") val jsonFileFeeder = jsonFile("data.json").circular //数据Feeder val dubboScenario = scenario("load test dubbo") .forever("repeated") { feed(jsonFileFeeder) .exec(session => transformJsonDubboData("args_types1", "args_values1", session)) .exec(dubbo("com.xxx.xxxService", "methodName") .argTypes("${args_types1}") .argValues("${args_values1}") .check(jsonPath("$.code").is("200")) ) } setUp( dubboScenario.inject(atOnceUsers(10)) .throttle( reachRps(10) in (1 seconds), holdFor(30 seconds)) ).protocols(dubboConfig) }
data.json 示例:
[ { "args_types1": ["com.xxx.xxxDTO"], "args_values1": [{ "field1": "111", "field2": "222", "field3": "333" }] } ]
我的系列博客
混沌工程 - 软件系统高可用、弹性化的必由之路
异步系统的两种测试方法
我的其他测试相关开源项目
捉虫记:方便产品、开发、测试三方协同自测的管理工具
招聘
有赞测试组在持续招人中,大量岗位空缺,只要你来,就能帮你点亮全栈开发技能树,有意向换工作的同学可以发简历到 sunjun【@】youzan.com