gary00 2013-07-19
Netty是由JBOSS提供的一个java开源框架,基于nio。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
官网:http://netty.io/
websocket通信示例如下(以下使用的netty版本为3.6.1):
说明:
通信服务端运用Netty,Netty中自带Jboss,iphone使用SocketRocket,进行推送数据。
随便画的草图:
SocketRocket关健代码(引入SocketRocket相关包和文件,官网上有下):
- (void)viewDidLoad { [super viewDidLoad]; //Socket _webSocket.delegate = nil; [_webSocket close]; _webSocket = [[SRWebSocket alloc] initWithURLRequest:[NSURLRequest requestWithURL:[NSURL URLWithString:@"ws://127.0.0.0:8080/websocket"]]]; _webSocket.delegate = self; [_webSocket open]; NSLog(@"open success!"); } #pragma mark - SRWebSocketDelegate - (void)webSocketDidOpen:(SRWebSocket *)webSocket; { NSLog(@"Websocket Connected"); self.title = @"Connected!"; } - (void)webSocket:(SRWebSocket *)webSocket didFailWithError:(NSError *)error; { NSLog(@":( Websocket Failed With Error %@", error); _webSocket = nil; } - (void)webSocket:(SRWebSocket *)webSocket didReceiveMessage:(id)message; { NSLog(@"Received \"%@\"", message); self.showTxt.text = message; } - (void)webSocket:(SRWebSocket *)webSocket didCloseWithCode:(NSInteger)code reason:(NSString *)reason wasClean:(BOOL)wasClean; { NSLog(@"WebSocket closed"); self.title = @"Connection Closed! (see logs)"; _webSocket = nil; }
网页WebSocket关代码:
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html lang="zh-CN"> <head> <meta charset="utf-8"> <meta name="viewport" content="width=device-width"/> <!-- WebSocket --> <!-- 注:有的浏览器需要判断进行特别处理websocket --> <script type="text/javascript"> var websocket; function init(){ window.WebSocket = window.WebSocket || window.MozWebSocket; if(!window.WebSocket){ alert("WebSocket not support browser!"); return; } websocket = new WebSocket("ws://127.0.0.0:8080/websocket"); websocket.onopen = function (evt){ onOpen(evt) }; websocket.onclose = function (evt){ onClose(evt) }; websocket.onmessage = function (evt){ onMessage(evt) }; websocket.onerror = function (evt){ onError(evt) }; } function onOpen(evt){ //alert("connect open!"); } function onClose(evt){ //alert("connect close!"); } function onMessage(evt){ /* var data = evt.data; if(!data){ return; } alert(data); */ var msg = document.getElementById('responseText'); msg.value = msg.value+"\n"+evt.data; } function onError(evt){ alert("connect error!"); } //发送消息 function send(message){ if(!window.WebSocket){ return; } if(websocket.readyState == WebSocket.OPEN){ websocket.send(message); }else{ alert("The websocket is not open!"); } } </script> </head> <body onload="init()"> <form onsubmit="return false;"> <input type="text" name="message"></input><br/> <input type="button" value="发送" onclick="send(this.form.message.value)"></input><br/> <h3>输出:</h3> <textarea id="responseText" style="width:500px;height:300px;"></textarea> </form> </body> </html>
Netty代码:
public class WebSocketServer { private final int port; public WebSocketServer(int port) { this.port = port; } public void run() { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the event pipeline factory. bootstrap.setPipelineFactory(new WebSocketServerPipelineFactory()); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(port)); System.out.println("Web socket server started at port " + port + '.'); System.out.println("Open your browser and navigate to http://localhost:" + port + '/'); } public static void main(String[] args) { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new WebSocketServer(port).run(); } }
/** * Handles handshakes and messages */ public class WebSocketServerHandler extends SimpleChannelUpstreamHandler { private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocketServerHandler.class); private static final String WEBSOCKET_PATH = "/websocket"; private WebSocketServerHandshaker handshaker; //频道channel private static List<Channel> channels = null; private static ChannelGroup channelG = null; static{ channels = new ArrayList<Channel>(); channelG = new DefaultChannelGroup(); System.out.println("staic..............."); } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { Object msg = e.getMessage(); if (msg instanceof HttpRequest) { handleHttpRequest(ctx, (HttpRequest) msg); } else if (msg instanceof WebSocketFrame) { handleWebSocketFrame(ctx, (WebSocketFrame) msg); } } /** * 请求分发到页面 * @param ctx * @param req * @throws Exception */ private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req) throws Exception { // Allow only GET methods. if (req.getMethod() != GET) { sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1, FORBIDDEN)); return; } // Send the demo page and favicon.ico if (req.getUri().equals("/")) { HttpResponse res = new DefaultHttpResponse(HTTP_1_1, OK); ChannelBuffer content = WebSocketServerIndexPage.getContent(getWebSocketLocation(req)); res.setHeader(CONTENT_TYPE, "text/html; charset=UTF-8"); setContentLength(res, content.readableBytes()); res.setContent(content); sendHttpResponse(ctx, req, res); return; } else if (req.getUri().equals("/favicon.ico")) { HttpResponse res = new DefaultHttpResponse(HTTP_1_1, NOT_FOUND); sendHttpResponse(ctx, req, res); return; } // Handshake WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( getWebSocketLocation(req), null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { wsFactory.sendUnsupportedWebSocketVersionResponse(ctx.getChannel()); } else { handshaker.handshake(ctx.getChannel(), req).addListener(WebSocketServerHandshaker.HANDSHAKE_LISTENER); } } private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // Check for closing frame if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.getChannel(), (CloseWebSocketFrame) frame); return; } else if (frame instanceof PingWebSocketFrame) { ctx.getChannel().write(new PongWebSocketFrame(frame.getBinaryData())); return; } else if (!(frame instanceof TextWebSocketFrame)) { throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass() .getName())); } // Send the uppercase string back. String request = ((TextWebSocketFrame) frame).getText(); if (null != request) { System.out.println(String.format("Channel %s received %s", ctx.getChannel().getId(), request)); if (logger.isDebugEnabled()) { logger.debug(String.format("Channel %s received %s", ctx.getChannel().getId(), request)); } // ctx.getChannel().write(new TextWebSocketFrame(request.toUpperCase())); channelG.write(new TextWebSocketFrame(request.toUpperCase())); //循环频道 // System.err.println(channels.size()); // for (int i = 0; i < channels.size(); i++) { // channels.get(i).write(new TextWebSocketFrame(request.toUpperCase())); // } } } private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) { // Generate an error page if response status code is not OK (200). if (res.getStatus().getCode() != 200) { res.setContent(ChannelBuffers.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8)); setContentLength(res, res.getContent().readableBytes()); } // Send the response and close the connection if necessary. ChannelFuture f = ctx.getChannel().write(res); if (!isKeepAlive(req) || res.getStatus().getCode() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { e.getCause().printStackTrace(); e.getChannel().close(); System.out.println("exceptionCaught============="); } private static String getWebSocketLocation(HttpRequest req) { return "ws://" + req.getHeader(HttpHeaders.Names.HOST) + WEBSOCKET_PATH; } @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { // super.channelConnected(ctx, e); // channels.add(e.getChannel()); channelG.add(e.getChannel()); // channelG.addAll(channels); } @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { super.channelDisconnected(ctx, e); } }
/** * Generates the demo HTML page which is served at http://localhost:8080/ 页面(该页面写在代码的形式输出) */ public final class WebSocketServerIndexPage { private static final String NEWLINE = "\r\n"; public static ChannelBuffer getContent(String webSocketLocation) { return ChannelBuffers.copiedBuffer( "<html><head><title>Web Socket Test</title></head>" + NEWLINE + "<body>" + NEWLINE + "<script type=\"text/javascript\">" + NEWLINE + "var socket;" + NEWLINE + "if (!window.WebSocket) {" + NEWLINE + " window.WebSocket = window.MozWebSocket;" + NEWLINE + "}" + NEWLINE + "if (window.WebSocket) {" + NEWLINE + " socket = new WebSocket(\"" + webSocketLocation + "\");" + NEWLINE + " socket.onmessage = function(event) {" + NEWLINE + " var ta = document.getElementById('responseText');" + NEWLINE + " ta.value = ta.value + '\\n' + event.data" + NEWLINE + " };" + NEWLINE + " socket.onopen = function(event) {" + NEWLINE + " var ta = document.getElementById('responseText');" + NEWLINE + " ta.value = \"Web Socket opened!\";" + NEWLINE + " };" + NEWLINE + " socket.onclose = function(event) {" + NEWLINE + " var ta = document.getElementById('responseText');" + NEWLINE + " ta.value = ta.value + \"Web Socket closed\"; " + NEWLINE + " };" + NEWLINE + "} else {" + NEWLINE + " alert(\"Your browser does not support Web Socket.\");" + NEWLINE + "}" + NEWLINE + NEWLINE + "function send(message) {" + NEWLINE + " if (!window.WebSocket) { return; }" + NEWLINE + " if (socket.readyState == WebSocket.OPEN) {" + NEWLINE + " socket.send(message);" + NEWLINE + " } else {" + NEWLINE + " alert(\"The socket is not open.\");" + NEWLINE + " }" + NEWLINE + "}" + NEWLINE + "</script>" + NEWLINE + "<form onsubmit=\"return false;\">" + NEWLINE + "<input type=\"text\" name=\"message\" value=\"Hello, World!\"/>" + "<input type=\"button\" value=\"Send Web Socket Data\"" + NEWLINE + " onclick=\"send(this.form.message.value)\" />" + NEWLINE + "<h3>Output</h3>" + NEWLINE + "<textarea id=\"responseText\" style=\"width:500px;height:300px;\"></textarea>" + NEWLINE + "</form>" + NEWLINE + "</body>" + NEWLINE + "</html>" + NEWLINE, CharsetUtil.US_ASCII); } private WebSocketServerIndexPage() { // Unused } }
/** */ public class WebSocketServerPipelineFactory implements ChannelPipelineFactory { public ChannelPipeline getPipeline() throws Exception { // Create a default pipeline implementation. ChannelPipeline pipeline = pipeline(); pipeline.addLast("decoder", new HttpRequestDecoder()); pipeline.addLast("aggregator", new HttpChunkAggregator(65536)); pipeline.addLast("encoder", new HttpResponseEncoder()); pipeline.addLast("handler", new WebSocketServerHandler()); return pipeline; } }
注:在推前两次请求时会报
java.lang.IllegalArgumentException: unsupported message type: class org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame
暂末找到原因,不过不影响程序的运行,跟进中......