liwenbocsu 2017-04-06
注:路由规则在注册的时候在从注册中心设置,实现dubbo本身的负载在dubbo框架中(和zk没关zk自己的负载是自己的客户端调用自己的服务)
消费端调用远程服务接口时,使用上和调用普通的Java接口是没有任何区别,但是服务消费者和提供者是跨JVM和主机的,客户端如何封装请求让服务端理解请求并且解析服务端返回的接口调用结果,服务端如何解析客户端的请求并且向客户端返回调用结果,这些框架是如何实现的,下面就来看下这部分的代码。
消费端调用提供端服务的过程要执行下面几个步骤:
1. 消费端触发请求
2. 消费端请求编码
3. 提供端请求解码
4. 提供端处理请求
5. 提供端响应结果编码
6. 消费端响应结果解码
总体过程:dubboinvoke -->nettychanle-->nettysever-->nettyhandle-->NettyClient-->nettyhandle(不同的方法)
在消费者初始化的时候,会生成一个消费者代理注册到容器中,该代理回调中持有一个MockClusterInvoker实例,消费调用服务接口时它的invoke会被调用,此时会构建一个RpcInvocation对象,把服务接口的method对象和参数放到RpcInvocation对象中,作为MockClusterInvoker.invoke方法的参数,在这个invoke方法中,判断请求是否需要mock,是否配置了mock属性,是强制mock还是失败后mock,关于mock这里先不详细展开,这里只看下核心流程。
MockClusterInvoker.invoke会调用FailfastClusterInvoker.invoke,大系统中为了服务高可用同一个服务一般会有多个应用服务器提供,要先挑选一个提供者提供服务。在服务接口消费者初始化时,接口方法和提供者Invoker对应关系保存在RegistryDirectory的methodInvokerMap中,通过调用的方法名称(或方法名称+第一个参数)改方法对应的提供者invoker列表,如注册中心设置了路由规则,对这些invoker根据路由规则进行过滤。
com.alibaba.dubbo.registry.integration.RegistryDirectory.doList(Invocation)
com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectory.list(Invocation)
读取到所有符合条件的服务提供者invoker之后,由LoadBalance组件执行负载均衡,从中挑选一个invoker进行调用,框架内置支持的负载均衡算法包括random(随机)、roundrobin(R-R循环)、leastactive(最不活跃)、consistenthash(一致性hash),应用可配置,默认random。
methodInvokerMap保存的是持有DubboInvoker(dubbo协议)实例的InvokerDelegete对象,是Invoker-Filter链的头部,先激活Filter连然后最终调到DubboInvoker.invoke(RpcInvocation),此时远程调用分三种类型:
1. 单向调用,无需获取关注调用结果的,无需等待接口返回结果,注意调用结果不要单纯跟返回值混淆了,异常也是调用结果。
2. 异步调用,需要关注返回结果,但是不会同步等待接口调用结束,会异步的获取返回返回结果,这种情况给调用者返回一个Future,但是不同步等待Future.get返回调用结果
3. 同步调用,需要同步等待服务调用结束获取调用结果,给调用者返回一个Future并且Future.get等待结果,此时接口调用线程会挂起等待响应。 (立即Future.get)
我们大部分使用场景都是同步调用,所以主要看一下同步调用。如果使用者配置了多个connections按顺序选择一个ExchangeClient和服务器通信,同步调用时调用HeaderExchangeClient.request->HeaderExchangeChannel.request。
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.request(Object, int)
这里的request参数是RpcInvocation对象,包含调用的方法、参数等信息,timeout参数是接口超时时间,把这些信息封装在Request对象中,调用channel.send,这个channel对象就是和服务端打交道的NettyClient实例,NettyClient.send调用NettyChannel.send。
com.alibaba.dubbo.remoting.transport.netty.NettyChannel.send(Object, boolean)
这里的sent参数决定是否等待请求消息发出,sent=true 等待消息发出,消息发送失败将抛出异常,sent=false 不等待消息发出,将消息放入IO队列,即刻返回。默认情况下都是false。NettyChannel中有channel属性,这个channel是Netty框架中的组件,负责客户端和服务端链路上的消息传递,channel.write把请求消息写入,这里的message是上面封装的Request对象。这里的IO模型是非阻塞的,线程不用同步等待所有消息写完,而是直接返回。调用Netty框架的IO事件之后会触发Netty框架的IO事件处理链。
在消费者初始化创建NettyClient时了解到了,NettyClient添加了三个事件处理器组成处理器链:NettyCodecAdapter.decoder->NettyCodecAdapter.encoder->NettyHandler,其中NettyCodecAdapter.encoder下行事件处理器(实现了ChannelDownstreamHandler接口),NettyCodecAdapter. decoder是上行事件处理器(实现了ChannelUpstreamHandler接口),NettyHandler是上行事件+下行时间处理器(同时实现了ChannelUpstreamHandler和ChannelDownstreamHandler接口)。channel.write在Netty框架中是一个下行事件,所以NettyCodecAdapter.encoder和NettyHandler处理器会被回调,下行事件的事件处理器调用顺序是从后到前,即后添加的处理器先执行。
NettyHandler没有对请求消息做任何加工,只是触发dubbo框架的一些回调,这些回调里面没有做任何核心的事情,
com.alibaba.dubbo.remoting.transport.netty.NettyHandler.writeRequested(ChannelHandlerContext, MessageEvent)
encoder顾名思义就是编码器,它的主要工作就是把数据按照客户端-服务端的约定协议对请求信息和返回结果进行编码。看下它的encode方法:
下行事件触发之后依次调用handleDownstream->doEncode->encode,在encode中对Request对象进行编码。这个msg参数就是上面被write的Request对象,这里的Codec2组件是DubboCountCodec实现,DubboCountCodec.encode调用DubboCodec.Encode
com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encode(Channel, ChannelBuffer, Object)
netty
消息头+消息体(invoker对象)
消息头简单直接就是消息体长度,复杂一点可以还包括消息id,请求类型,协议,序列化等
根据协议,消息中写入16个字节的消息头: (消息传送的编码思路可以参考此)
1、1-2字节,固定的魔数
2、第3个字节,第8位存储数据类型是请求数据还是响应数据,其它7位存储序列化类型,约定和服务端的序列化-反序列化协议
3、5-12个字节,请求id
4、13-16个字节,请求数据长度
com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encodeRequest(Channel, ChannelBuffer, Request)
从URL中查找序列化扩展点名称,加载序列化组件把请求对象序列化成二进制。消费端和提供端的序列化反序列化协议要配套,所以这个序列化协议一般是在提供端指定的,指定的协议类型会在提供者和消费者初始化的时候写入到URL对象中,框架中默认的序列化协议是hessian2。消息体数据包含dubbo版本号、接口名称、接口版本、方法名称、参数类型列表、参数、附加信息,把它们按顺序依次序列化,数据写入到类型为ChannelBuffer的buffer参数中,然后把ChannelBuffer封装成Netty框架的org.jboss.netty.buffer.ChannelBuffer。如果参数中有回调接口,还需要在消费端启动端口监听提供端的回调,这里不展开。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.encodeRequestData(Channel, ObjectOutput, Object)
然后把封装好的ChannelBuffer写到链路发送到服务端,这里消费端前半部分的工作就完成,接下来目光要转移到服务端。
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(ChannelHandlerContext, MessageEvent)
在看提供端初始化代码的时候看到,框架在创建NettyServer时,也会创建netty框架的IO事件处理器链:NettyCodecAdapter.decoder->NettyCodecAdapter.encoder->NettyHandler
com.alibaba.dubbo.remoting.transport.netty.NettyServer.doOpen()
客户端发送数据到服务端时会触发服务端的上行IO事件并且启动处理器回调,NettyCodecAdapter.decoder和NettyHandler是上行事件处理器,上行事件处理器调用顺序是从前到后执行,即先添加的处理器先执行,所以先触发NettyCodecAdapter.decoder再触发NettyHandler。
由NettyCodecAdapter.decoder对请求进行解码,把消息翻译成提供端可理解的,上行事件调用decoder的handleUpstream->messageReceived
com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter.InternalDecoder.messageReceived(ChannelHandlerContext, MessageEvent)
com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.decode(Channel, ChannelBuffer, int, byte[])
把数据读取到ChannelBuffer之后扔给Codec2组件进行解码处理,这里有个半包传输处理,因为这里使用的是非阻塞式的IO模型,非阻塞IO的特点是线程的读取数据是事件触发式,是由一个Selector组件轮询准备就绪的IO事件,发现准备就绪的事件之后通知线程读取,这种模式的好处是可以极大的优化线程模型,只需少数几个线程处理所有客户端和服务端连接,而阻塞IO需要线程和连接要一对一,但是非阻塞IO远高于阻塞式IO,不像阻塞式IO读写数据时只有数据读完或者超时才会返回,这样能保证读到的数据肯定是完整,而非阻塞模式方法返回之后可能只读到一部分数据,框架的处理是在解析消息时检查消息的长度确定是否有完整的数据,如果数据不完整返回NEED_MORE_INPUT,保存当前解析的位置等待链路的下次IO事件,在下次IO事件到达时从上次保存的位置开始解析。
读取到完整的数据之后解析数据头,读取魔数、序列化类型、以及请求id,读取第3个字节判断改数据是消费端请求数据还是提供端响应数据(因为消费端和提供端解码器代码是共用的),并且从1-7位从读出序列化类型,并且根据此序列化类型加载序列化组件对消息进行反序列化按顺序读取消费端写入的dubbo版本号、接口名称、接口版本、方法名称、参数类型列表、参数、附加信息,写入DecodeableRpcInvocation对象对应的属性中。(基于消息长度,读取到完整数据流为止)
com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.decodeBody(Channel, InputStream, byte[])
com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation.decode(Channel, InputStream)
创建一个Request对象,把DecodeableRpcInvocation对象对象设置到Request对象的data属性中。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.decodeBody(Channel, InputStream, byte[])
解码完成之后,激活下一个处理器的messageReceived事件,并且把解码后的对象封装在MessageEvent中。
com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter.InternalDecoder.messageReceived(ChannelHandlerContext, MessageEvent)
org.jboss.netty.channel.Channels.fireMessageReceived(ChannelHandlerContext, Object, SocketAddress)
Decoder执行完之后,事件进入到下一个处理器NettyHandler,看下NettyHandler中的代码:
这里直接交给handler处理了,这个handler封装了很多层:DecodeHandler->HeaderExchangeHandler->DubboProtocol.requestHandler,中间封装了好几万层这里只把重要的列出来,源头是从创建NettyServer的时候传过来的。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.createServer(URL)
先会走到DecodeHandler.received:
com.alibaba.dubbo.remoting.transport.DecodeHandler.received(Channel, Object)
这个message是Request类型的,要先decode一下,因为在之前已经解码过了,所以这里不会做任何事情,直接走下一个handler.received,这个handler就是HeaderExchangeHandler:
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.received(Channel, Object)
普通的同步接口twoWay属性是true走handleRequest方法处理请求,处理结束之后调用channel.send把结果返回到客户端。
请求处理再走下一个handler的reply,这个handler就是DubboProtocol.requestHandler,把request对象中的data取出来传到requestHandler中,这个data就是前面的解码后的DecodeableRpcInvocation对象它是Invocation接口的一个实现:
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleRequest(ExchangeChannel, Request)
com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
查找提供端请求对应的Invoker,在接口提供者初始化时,每个接口都会创建一个Invoker和Exporter,Exporter持有invoker实例,Exporter对象保存在DubboProtocol的exporterMap中,key是由URL生成的serviceKey,此时通过Invocation中的信息就可还原该serviceKey并且找到对应的Exporter和Invoker,在分析提供者初始化代码时知道它是Invoker-Filter的头节点,激活Filter后调用由ProxyFactory生成的Invoker:
调用invoker.invoke时,通过反射调用最终的服务实现执行相关逻辑。
服务执行结束之后,创建一个Response对象返回给客户端。在执行服务实现时会出现两种结果:成功和失败,如果成功,把返回值设置到Response的result中,Response的status设置成OK,如果失败,把失败异常设置到Response的errorMessage中,status设置成SERVICE_ERROR。
回到HeaderExchangeHandler.received中的代码,在handleRequest之后,调用channel.send把Response发送到客户端,这个channel封装客户端-服务端通信链路,最终会调用Netty框架,把响应写回到客户端。
提供端要按照和消费端的协议把Response按照特定的协议进行编码,把编码后的数据写回到消费端,从上面的代码可以看到,在NettyServer初始化的时候,定义了三个IO事件处理器,服务端往客户端回写响应时产生下行事件,处理下行事件处理器,NettyCodecAdapter.encoder和NettyHandler是下行事件处理器,先激活NettyHandler,再激活NettyCodecAdapter. encoder,在NettyCodecAdapter. encoder对响应结果进行编码,还是通过Code2组件和请求编码时使用的组件一样,把响应类型和响应结果依次写回到客户端,根据协议会写入16个字节的数据头,包括:
1、1-2字节魔数
2、第3个字节,序列化组件类型,约定和客户端的序列化-反序列化协议
3、第4个字节,响应状态,是OK还是error
4、5-13个字节,响应id,这里的id和request中的id一样
5、13-16个字节,响应数据长度
com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encodeResponse(Channel, ChannelBuffer, Response)
返回结果有三种结果:1、没有返回值即返回类型是void;2、有返回值并且执行成功;3、服务调用异常。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.encodeResponseData(Channel, ObjectOutput, Object)
解码后的数据会写入到通信链路中。
服务端给客户端回写数据之后,客户端会收到IO事件,一个上行事件。NettyClient中有两个上行事件处理器NettyCodecAdapter.decoder和NettyHandler,按照顺序decoder先执行对服务端传过来的数据进行解码,解析出序列化协议、响应状态、响应id(即请求id)。把响应body数据读到DecodeableRpcResult对象中,进行解析同时加载处理原始Request数据,这个Request对象在请求时会被缓存到DefaultFuture中,加载Request的目的是因为Request中Invocation中携带了服务接口的返回值类型信息,需要根据这个类型把响应解析创建对应类型的对象。
com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcResult.decode(Channel, InputStream)
创建Response对象并且把解析出结果或异常设置到Response中。
decoder把响应解析成Response对象中,NettyHandler接着往下处理,同样触发它的messageReceive事件,在提供端解码的时候看到了,它的handler封装关系是:DecodeHandler->HeaderExchangeHandler->DubboProtocol.requestHandler,主要处理在HeaderExchangeHandler中:
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleResponse(Channel, Response)
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture.doReceived(Response)
这里主要做的事情是唤醒调用者线程,并且把Response设置到DefaultFuture中,在消费者触发请求的代码中可以看到,消费端调用接口的时候请求写到提供端之后,会调用DefaultFuture.get阻塞等待响应结果:
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture.get(int)
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture.isDone()
在done这个Condition上进行条件等待,DefaultFuture.doReceive时,设置response唤醒done,此时调用线程被唤醒并且检查是否已经有了response(避免假唤醒),唤醒之后返回response中的result,调用端即拿到了接口的调用结果(返回值或异常),整个远程服务接口的调用流程就完成了。
前面说了在进行接口调用时会出现两种情况:接口调用成功、接口调用异常,其实还有一种情况就是接口调用超时。在消费端等待接口返回时,有个timeout参数,这个时间是使用者设置的,可在消费端设置也可以在提供端设置,done.await等待时,会出现两种情况跳出while循环,一是线程被唤醒并且已经有了response,二是等待时间已经超过timeout,此时也会跳出while,当跳出while循环并且Future中没有response时,就说明接口已超时抛出TimeoutException,框架把TimeoutException封装成RpcException抛给应用层。
参考:
http://blog.csdn.net/tolihaifeng/article/details/60972120