fengshantao 2015-12-23
上一章已经讲了如何搭建一个简单的nettyserver,这一章讲一下netty超时心跳机制。
一般应用场景是client在一定时间未收到server端数据时给server端发送心跳请求,server收到心跳请求后发送一个心跳包给client端,以此维持通信。
发送心跳由client执行,server端反馈心跳就可以了,好了不多说了,上代码:
nettyserver代码:
import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.timeout.IdleStateHandler; public class NettyServerBootstrap { private static Logger logger = Logger.getLogger(NettyServerBootstrap.class); private int port; public NettyServerBootstrap(int port) { this.port = port; bind(); } private void bind() { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker); bootstrap.channel(NioServerSocketChannel.class); bootstrap.option(ChannelOption.SO_BACKLOG, 1024); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); // 超时处理:参数分别为读超时时间、写超时时间、读和写都超时时间、时间单位 p.addLast(new IdleStateHandler(15, 30, 30, TimeUnit.SECONDS)); p.addLast(new NettyIdleStateHandler()); p.addLast(new NettyServerHandler()); } }); ChannelFuture f = bootstrap.bind(port).sync(); if (f.isSuccess()) { logger.debug("启动Netty服务成功,端口号:" + this.port); } // 关闭连接 f.channel().closeFuture().sync(); } catch (Exception e) { logger.error("启动Netty服务异常,异常信息:" + e.getMessage()); e.printStackTrace(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
server端NettyIdleStateHandler:
import org.apache.log4j.Logger; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; /** * 处理超时连接handler * * @author Guo Kaixuan * */ public class NettyIdleStateHandler extends ChannelHandlerAdapter { private static Logger logger = Logger .getLogger(NettyIdleStateHandler.class); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.READER_IDLE)) { logger.warn("Netty服务器在通道" + ctx.channel().id() + "上读超时,已将该通道关闭"); } else if (event.state().equals(IdleState.WRITER_IDLE)) { logger.warn("Netty服务器在通道" + ctx.channel().id() + "上写超时,已将该通道关闭"); } else if (event.state().equals(IdleState.ALL_IDLE)) { logger.warn("Netty服务器在通道" + ctx.channel().id() + "上读&写超时,已将该通道关闭"); } super.userEventTriggered(ctx, evt); } } }
server端NettyServerHandler
import java.io.UnsupportedEncodingException; import org.apache.log4j.Logger; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.SocketChannel; public class NettyServerHandler extends ChannelHandlerAdapter { private static Logger logger = Logger.getLogger(NettyServerHandler.class); @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf buf = (ByteBuf) msg; String recieved = getMessage(buf); System.out.println("服务器接收到消息:" + recieved); try { ctx.writeAndFlush(newPing("服务器已收到心跳包")); } catch (Exception e) { ServiceLoggerUtils.error("通信异常"); return; } } /* * 从ByteBuf中获取信息 使用UTF-8编码返回 */ private String getMessage(ByteBuf buf) { byte[] con = new byte[buf.readableBytes()]; buf.readBytes(con); try { return new String(con, Constant.UTF8); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return null; } } public static ByteBuf newPing(String message) { byte[] mes = message.getBytes(); ByteBuf pingMessage = Unpooled.buffer(); pingMessage.writeBytes(mes); return pingMessage; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { } }
client端代码:
import java.util.concurrent.TimeUnit; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.timeout.IdleStateHandler; public class NettyClientBootstrap { /* * 服务器端口号 */ private int port; /* * 服务器IP */ private String host; @SuppressWarnings("unused") private SocketChannel socketChannel; public NettyClientBootstrap(int port, String host) throws InterruptedException { this.port = port; this.host = host; start(); } private void start() throws InterruptedException { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.group(eventLoopGroup); bootstrap.remoteAddress(host, port); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //超时处理:参数分别为读超时时间、写超时时间、读和写都超时时间、时间单位 socketChannel.pipeline().addLast(new IdleStateHandler(3, 8, 0, TimeUnit.SECONDS)); socketChannel.pipeline().addLast(new NettyIdleStateHandler()); socketChannel.pipeline().addLast(new NettyClientHandler()); } }); ChannelFuture future = bootstrap.connect(host, port).sync(); if (future.isSuccess()) { socketChannel = (SocketChannel) future.channel(); System.out.println("----------------connect server success----------------"); } future.channel().closeFuture().sync(); } finally { eventLoopGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { NettyClientBootstrap bootstrap = new NettyClientBootstrap(9999, "localhost"); } }
client端NettyIdleStateHandler:
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; /** * 处理超时连接handler * * @author * */ public class NettyIdleStateHandler extends ChannelHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.READER_IDLE)) { String message = "心跳包"; byte[] req = message.getBytes(); ByteBuf pingMessage = Unpooled.buffer(); pingMessage.writeBytes(req); ctx.writeAndFlush(pingMessage); System.out.println("客户端读超时,已发送心跳"); } else if (event.state().equals(IdleState.WRITER_IDLE)) { System.out.println("客户端写超时"); } else if (event.state().equals(IdleState.ALL_IDLE)) { System.out.println("客户端读&写超时"); } super.userEventTriggered(ctx, evt); } } }
client端NettyClientHandler:
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class NettyClientHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String result = getMessage((ByteBuf) msg); System.out.print("客户端收到服务器响应数据:" + result); } private String getMessage(ByteBuf buf) { byte[] con = new byte[buf.readableBytes()]; buf.readBytes(con); try { return new String(con, Constant.UTF8); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return null; } } }
以上代码需要引入netty5包和log4j包,netty包上一张有将如何加入,log4j请自行下载或者maven添加。如有问题欢迎留言。