RocketMQ 源码之 异步和同步请求是怎么做到的

lypgcs 2020-01-28

不管是DefaultMQProducer还是DefaultMQPushConsumer,本质都是封装类,发起请求的实际上是RemotingClient,

它的start方法调用之后,启动了一个netty的客户端bootstrap,每次需要与nameService或者broker进行连接的时候,调用

getAndCreateChannel方法,从一个map中创建或者获取channel(创建的时候nameService和broker两者的区别在于addr参数是不是为null),

连接建立之后,发起请求调用的是invokeSync和invokeAsync,点进去看:

同步invokeSync的实现是新建一个responseFuture,放到responseTable中(key是自增的requestId),然后调用channel.writeAndFlush(request),

发起请求,最后调用responseFuture.waitResponse,等待响应。让线程等待用的是countDownLatch,那么latch之后怎样放行呢?

数据的发出是writeAndFlush,进来就应该是在channel的read方法中,去查看bootstrap的构造过程,发现添加的handler中有

NettyClientHandler,点进去一看重写了channelRead0方法(在其父类的channelRead方法被调用),里面有processMessageReceived,

点进去发现根据收到的RemotingCommand的type,是对方的主动请求还是对自己之前请求的应答,在我们这里,应该是后者,所以

进入processResponseCommand,一看,果然是从上面说的responseTable再把responseFuture取出来,如果有回调方法,这里是同步请求

,没有(待会说的异步请求才有),那么进入responseFuture.putResponse(cmd),点进去看,看见了countDownLatch.countDown(),至此

同步方法逻辑通了。

一些思考:其实不管是同步还是异步, 请求过去再回来,

相关推荐