Fightingxr 2020-02-19
在网络传输中,怎么确保通道连接的可用性是一个很重要的问题,简单的说,在网络通信中有客户端和服务端,一个负责发送请求,一个负责接收请求,在保证连接有效性的背景下,这两个物体扮演了什么角色,心跳机制能有效的保证连接的可用性,那它的机制是什么,下文中将会详细讲解。
首先讲一下TCP,在dubbo中的通信是基于TCP的,TCP本身并没有长短连接的区别,在短连接中,每次通信时,都会创建Socket,当该次通信结束后,就会调用socket.close();而在长连接中,每次通信完毕后,不会关闭连接,这样就可以做到连接的复用,长连接的好处是省去了创建连接时的耗时。那么如何确保连接的有效性呢,在TCP中用到了KeepAlive机制,keepalive并不是TCP协议的一部分,但是大多数操作系统都实现了这个机制,在一定时间内,在链路上如果没有数据传送的情况下,TCP层将会发送相应的keepalive探针来确定连接可用性,探测失败后重试10次(tcp_keepalive_probes
),每次间隔时间为75s(tcp_keepalive_intvl
),所有探测失败后,才认为当前连接已经不可用了。
KeepAlive机制是在网络层保证了连接的可用性,但在应用层我们认为这还是不够的。
KeepAlive本身是面向网络的,并不是面向于应用的,可能是由于本身GC问题,系统load高等情况,但网络依然是通的,此时,应用已经失去了活性,所以连接自然认为是不可用的。
如何理解应用层的心跳?简单的说,就是客户端会开启一个定时任务,定时对已经建立连接的对端应用发送请求,服务端则需要特殊处理该请求,返回响应。如果心跳持续多次没有收到响应,客户端会认为连接不可用,主动断开连接。
在失败的场景下,服务端是不会返回响应的,所以只能在客户端自身上设计了。
当客户端发起一个RPC请求时,会设置一个超时时间client_timeout,同时它也会开启一个延迟的client_timeout的定时器。当接收到正常响应时,会移除该定时器;而当计时器倒计时完毕后,还没有被移除,则会认为请求超时,构造一个失败的响应传递给客户端。
HeaderExchangeClient类
public HeaderExchangeClient(Client client, boolean needHeartbeat) { if (client == null) { throw new IllegalArgumentException("client == null"); } this.client = client; // 创建信息交换通道 this.channel = new HeaderExchangeChannel(client); // 获得dubbo版本 String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); //获得心跳周期配置,如果没有配置,并且dubbo是1.0版本的,则这只为1分钟,否则设置为0 this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); // 获得心跳超时配置,默认是心跳周期的三倍 this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); if (needHeartbeat) { // 开启心跳 long tickDuration = calculateLeastDuration(heartbeat); heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-heartbeat", true) , tickDuration, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL); startHeartbeatTimer(); } }
创建了一个HashedWheelTimer
开启心跳检测,这是 Netty 所提供的一个经典的时间轮定时器实现。
HeaderExchangeServer
也同时开启了定时器,代码逻辑和上述差不多。
private void startHeartbeatTimer() { long heartbeatTick = calculateLeastDuration(heartbeat); long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout); HeartbeatTimerTask heartBeatTimerTask =new HeartbeatTimerTask(cp, heartbeatTick, heartbeat); ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout); heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS); heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS); }
在该方法中主要开启了两个定时器
final class HeartBeatTask implements Runnable { private static final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class); /** * 通道管理 */ private ChannelProvider channelProvider; /** * 心跳间隔 单位:ms */ private int heartbeat; /** * 心跳超时时间 单位:ms */ private int heartbeatTimeout; HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) { this.channelProvider = provider; this.heartbeat = heartbeat; this.heartbeatTimeout = heartbeatTimeout; } @Override public void run() { try { long now = System.currentTimeMillis(); // 遍历所有通道 for (Channel channel : channelProvider.getChannels()) { // 如果通道关闭了,则跳过 if (channel.isClosed()) { continue; } try { // 最后一次接收到消息的时间戳 Long lastRead = (Long) channel.getAttribute( HeaderExchangeHandler.KEY_READ_TIMESTAMP); // 最后一次发送消息的时间戳 Long lastWrite = (Long) channel.getAttribute( HeaderExchangeHandler.KEY_WRITE_TIMESTAMP); // 如果最后一次接收或者发送消息到时间到现在的时间间隔超过了心跳间隔时间 if ((lastRead != null && now - lastRead > heartbeat) || (lastWrite != null && now - lastWrite > heartbeat)) { // 创建一个request Request req = new Request(); // 设置版本号 req.setVersion(Version.getProtocolVersion()); // 设置需要得到响应 req.setTwoWay(true); // 设置事件类型,为心跳事件 req.setEvent(Request.HEARTBEAT_EVENT); // 发送心跳请求 channel.send(req); if (logger.isDebugEnabled()) { logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms"); } } // 如果最后一次接收消息的时间到现在已经超过了超时时间 if (lastRead != null && now - lastRead > heartbeatTimeout) { logger.warn("Close channel " + channel + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms"); // 如果该通道是客户端,也就是请求的服务器挂掉了,客户端尝试重连服务器 if (channel instanceof Client) { try { // 重新连接服务器 ((Client) channel).reconnect(); } catch (Exception e) { //do nothing } } else { // 如果不是客户端,也就是是服务端返回响应给客户端,但是客户端挂掉了,则服务端关闭客户端连接 channel.close(); } } } catch (Throwable t) { logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t); } } } catch (Throwable t) { logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t); } } interface ChannelProvider { // 获得所有的通道集合,需要心跳的通道数组 Collection<Channel> getChannels(); } }
它首先遍历所有的Channel,在服务端对用的是所有客户端连接,在客户端对应的是服务端连接,判断当前TCP连接是否空闲,如果空闲就发送心跳报文,判断是否空闲,根据Channel是否有读或写来决定,比如一分钟内没有读或写就发送心跳报文,然后是处理超时的问题,处理客户端超时重新建立TCP连接,目前的策略是检查是否在3分钟内都没有成功接受或发送报文,如果在服务端检测则就会主动关闭远程客户端连接。
在新版本下,去除了HeartBeatTask类,添加了HeartbeatTimerTask和ReconnectTimerTask类
public class HeartbeatTimerTask extends AbstractTimerTask { private static final Logger logger = LoggerFactory.getLogger(HeartbeatTimerTask.class); private final int heartbeat; HeartbeatTimerTask(ChannelProvider channelProvider, Long heartbeatTick, int heartbeat) { super(channelProvider, heartbeatTick); this.heartbeat = heartbeat; } @Override protected void doTask(Channel channel) { try { Long lastRead = lastRead(channel); Long lastWrite = lastWrite(channel); if ((lastRead != null && now() - lastRead > heartbeat) || (lastWrite != null && now() - lastWrite > heartbeat)) { Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setEvent(Request.HEARTBEAT_EVENT); channel.send(req); if (logger.isDebugEnabled()) { logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms"); } } } catch (Throwable t) { logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t); } } }
Dubbo采取的是双向心跳设计,即服务端会向客户端发送心跳,客户端也会向服务端发送心跳,接收的一方更新lastread字段,发送的一方更新lastWrite字段,超过心跳间隙的时间,便发送心跳请求给对端。
public class ReconnectTimerTask extends AbstractTimerTask { private static final Logger logger = LoggerFactory.getLogger(ReconnectTimerTask.class); private final int idleTimeout; public ReconnectTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int idleTimeout) { super(channelProvider, heartbeatTimeoutTick); this.idleTimeout = idleTimeout; } @Override protected void doTask(Channel channel) { try { Long lastRead = lastRead(channel); Long now = now(); // Rely on reconnect timer to reconnect when AbstractClient.doConnect fails to init the connection if (!channel.isConnected()) { try { logger.info("Initial connection to " + channel); ((Client) channel).reconnect(); } catch (Exception e) { logger.error("Fail to connect to " + channel, e); } // check pong at client } else if (lastRead != null && now - lastRead > idleTimeout) { logger.warn("Reconnect to channel " + channel + ", because heartbeat read idle time out: " + idleTimeout + "ms"); try { ((Client) channel).reconnect(); } catch (Exception e) { logger.error(channel + "reconnect failed during idle time.", e); } } } catch (Throwable t) { logger.warn("Exception when reconnect to remote channel " + channel.getRemoteAddress(), t); } } }
不同类型处理机制不同,当超过设置的心跳总时间后,客户端选择的是重新连接,服务端是选择直接断开连接。
Netty对空闲连接的检测提供了天然的支持,使用IdleStateHandler可以很方便的实现空闲检测逻辑。
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit){}
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast("clientIdleHandler", new IdleStateHandler(60, 0, 0)); } });
服务端:
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast("serverIdleHandler",new IdleStateHandler(0, 0, 200)); } }
从上面看出,客户端配置了read超时为60s,服务端配置了write/read超时未200s,
对于空闲超时的处理逻辑,客户端和服务端是不同的,首先来看客户端的:
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { // send heartbeat sendHeartBeat(); } else { super.userEventTriggered(ctx, evt); } }
检测到空闲超时后,采取的行为是向服务端发送心跳包,
public void sendHeartBeat() { Invocation invocation = new Invocation(); invocation.setInvocationType(InvocationType.HEART_BEAT); channel.writeAndFlush(invocation).addListener(new CallbackFuture() { @Override public void callback(Future future) { RPCResult result = future.get(); //超时 或者 写失败 if (result.isError()) { channel.addFailedHeartBeatTimes(); if (channel.getFailedHeartBeatTimes() >= channel.getMaxHeartBeatFailedTimes()) { channel.reconnect(); } } else { channel.clearHeartBeatFailedTimes(); } } }); }
构造一个心跳包发送到服务端,接受响应结果
响应失败,心跳失败标记+1,如果超过配置的失败次数,则重新连接
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { channel.close(); } else { super.userEventTriggered(ctx, evt); } }
服务端直接关闭连接。