xuMelon 2020-03-03
学了一段时间的netty知识,现在通过这个基于console的程序来对netty的相关接口做个简单的应用。
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.44.Final</version> </dependency>
我们都知道,一个典型的netty程序绝大部分使基于以下三部曲来走的;
按照以上的三部曲思路,就可以实现自己的网络程序了。
public class MyChatServer { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new MyChatServerInitializeor()); ChannelFuture channelFuture = serverBootstrap.bind(8899).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
public class MyChatServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 粘包、粘包处理器 pipeline.addLast("DelimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter())); pipeline.addLast("StringDecoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("StringEncoder", new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new MyServerChannelHandler()); } }
public class MyServerChannelHandler extends SimpleChannelInboundHandler<String> { /** * 保存channel对象 */ private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private static final String DATE_PARTTEN = "yyyy-MM-dd HH:mm:ss:SSS"; @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { Channel channel = ctx.channel(); channelGroup.forEach(ch -> { // 当前遍历的channel不是发送msg的channel对象。则向其他客户端广播 if (channel != ch) { ch.writeAndFlush(channel.remoteAddress() + ", 发送的消息" + msg + "\n"); } else { ch.writeAndFlush("[自己] " + msg + " \n"); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress() + " 上线了!"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress() + " 离开了!"); } /** * 客户端链接建立的时候调用 * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //super.handlerAdded(ctx); // 服务端与客户端建立 Channel channel = ctx.channel(); // 向其他链接的客户端发送广播信息 SocketAddress socketAddress = channel.remoteAddress(); String date = DateTimeFormatter.ofPattern(DATE_PARTTEN).format(LocalDateTime.now()); // 向channelGroup中的每一个channel对象发送一个消息 channelGroup.writeAndFlush(date + " [服务器] - " + socketAddress + " 加入 \n"); // 保存该客户端链接 channelGroup.add(channel); } /** * 链接断开 * @param ctx * @throws Exception */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); String date = DateTimeFormatter.ofPattern(DATE_PARTTEN).format(LocalDateTime.now()); channelGroup.writeAndFlush(date + " [服务器] - " + channel.remoteAddress() + " 离开 \n"); } /** * 客户端注册进来 * @param ctx * @throws Exception */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { super.channelUnregistered(ctx); } }
public class MyChatClient { public static void main(String[] args) { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .handler(new MyClientChannelInitializor()); ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync(); Channel channel = channelFuture.channel(); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); for (; ; ) { channel.writeAndFlush(bufferedReader.readLine() + "\r\n"); } } catch (InterruptedException | IOException e) { e.printStackTrace(); } finally { eventLoopGroup.shutdownGracefully(); } } }
public class MyClientChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("DelimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter())); pipeline.addLast("StringDecoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("StringEncoder", new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new MyChatClientChannelHandler()); } }
public class MyChatClientChannelHandler extends SimpleChannelInboundHandler<String> { // 直接打印服务端返回的消息 @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); } }