gary00 2013-07-19
Netty是由JBOSS提供的一个java开源框架,基于nio。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
官网:http://netty.io/
websocket通信示例如下(以下使用的netty版本为3.6.1):
说明:
通信服务端运用Netty,Netty中自带Jboss,iphone使用SocketRocket,进行推送数据。
随便画的草图:

SocketRocket关健代码(引入SocketRocket相关包和文件,官网上有下):
- (void)viewDidLoad
{
[super viewDidLoad];
//Socket
_webSocket.delegate = nil;
[_webSocket close];
_webSocket = [[SRWebSocket alloc] initWithURLRequest:[NSURLRequest requestWithURL:[NSURL URLWithString:@"ws://127.0.0.0:8080/websocket"]]];
_webSocket.delegate = self;
[_webSocket open];
NSLog(@"open success!");
}
#pragma mark - SRWebSocketDelegate
- (void)webSocketDidOpen:(SRWebSocket *)webSocket;
{
NSLog(@"Websocket Connected");
self.title = @"Connected!";
}
- (void)webSocket:(SRWebSocket *)webSocket didFailWithError:(NSError *)error;
{
NSLog(@":( Websocket Failed With Error %@", error);
_webSocket = nil;
}
- (void)webSocket:(SRWebSocket *)webSocket didReceiveMessage:(id)message;
{
NSLog(@"Received \"%@\"", message);
self.showTxt.text = message;
}
- (void)webSocket:(SRWebSocket *)webSocket didCloseWithCode:(NSInteger)code reason:(NSString *)reason wasClean:(BOOL)wasClean;
{
NSLog(@"WebSocket closed");
self.title = @"Connection Closed! (see logs)";
_webSocket = nil;
}网页WebSocket关代码:
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width"/>
<!-- WebSocket -->
<!-- 注:有的浏览器需要判断进行特别处理websocket -->
<script type="text/javascript">
var websocket;
function init(){
window.WebSocket = window.WebSocket || window.MozWebSocket;
if(!window.WebSocket){
alert("WebSocket not support browser!");
return;
}
websocket = new WebSocket("ws://127.0.0.0:8080/websocket");
websocket.onopen = function (evt){
onOpen(evt)
};
websocket.onclose = function (evt){
onClose(evt)
};
websocket.onmessage = function (evt){
onMessage(evt)
};
websocket.onerror = function (evt){
onError(evt)
};
}
function onOpen(evt){
//alert("connect open!");
}
function onClose(evt){
//alert("connect close!");
}
function onMessage(evt){
/* var data = evt.data;
if(!data){
return;
}
alert(data); */
var msg = document.getElementById('responseText');
msg.value = msg.value+"\n"+evt.data;
}
function onError(evt){
alert("connect error!");
}
//发送消息
function send(message){
if(!window.WebSocket){
return;
}
if(websocket.readyState == WebSocket.OPEN){
websocket.send(message);
}else{
alert("The websocket is not open!");
}
}
</script>
</head>
<body onload="init()">
<form onsubmit="return false;">
<input type="text" name="message"></input><br/>
<input type="button" value="发送" onclick="send(this.form.message.value)"></input><br/>
<h3>输出:</h3>
<textarea id="responseText" style="width:500px;height:300px;"></textarea>
</form>
</body>
</html>Netty代码:
public class WebSocketServer {
private final int port;
public WebSocketServer(int port) {
this.port = port;
}
public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new WebSocketServerPipelineFactory());
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
System.out.println("Web socket server started at port " + port + '.');
System.out.println("Open your browser and navigate to http://localhost:" + port + '/');
}
public static void main(String[] args) {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new WebSocketServer(port).run();
}
}/**
* Handles handshakes and messages
*/
public class WebSocketServerHandler extends SimpleChannelUpstreamHandler {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocketServerHandler.class);
private static final String WEBSOCKET_PATH = "/websocket";
private WebSocketServerHandshaker handshaker;
//频道channel
private static List<Channel> channels = null;
private static ChannelGroup channelG = null;
static{
channels = new ArrayList<Channel>();
channelG = new DefaultChannelGroup();
System.out.println("staic...............");
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object msg = e.getMessage();
if (msg instanceof HttpRequest) {
handleHttpRequest(ctx, (HttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
/**
* 请求分发到页面
* @param ctx
* @param req
* @throws Exception
*/
private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req) throws Exception {
// Allow only GET methods.
if (req.getMethod() != GET) {
sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1, FORBIDDEN));
return;
}
// Send the demo page and favicon.ico
if (req.getUri().equals("/")) {
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, OK);
ChannelBuffer content = WebSocketServerIndexPage.getContent(getWebSocketLocation(req));
res.setHeader(CONTENT_TYPE, "text/html; charset=UTF-8");
setContentLength(res, content.readableBytes());
res.setContent(content);
sendHttpResponse(ctx, req, res);
return;
} else if (req.getUri().equals("/favicon.ico")) {
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, NOT_FOUND);
sendHttpResponse(ctx, req, res);
return;
}
// Handshake
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(req), null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
wsFactory.sendUnsupportedWebSocketVersionResponse(ctx.getChannel());
} else {
handshaker.handshake(ctx.getChannel(), req).addListener(WebSocketServerHandshaker.HANDSHAKE_LISTENER);
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.getChannel(), (CloseWebSocketFrame) frame);
return;
} else if (frame instanceof PingWebSocketFrame) {
ctx.getChannel().write(new PongWebSocketFrame(frame.getBinaryData()));
return;
} else if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
// Send the uppercase string back.
String request = ((TextWebSocketFrame) frame).getText();
if (null != request) {
System.out.println(String.format("Channel %s received %s", ctx.getChannel().getId(), request));
if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel %s received %s", ctx.getChannel().getId(), request));
}
// ctx.getChannel().write(new TextWebSocketFrame(request.toUpperCase()));
channelG.write(new TextWebSocketFrame(request.toUpperCase()));
//循环频道
// System.err.println(channels.size());
// for (int i = 0; i < channels.size(); i++) {
// channels.get(i).write(new TextWebSocketFrame(request.toUpperCase()));
// }
}
}
private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
// Generate an error page if response status code is not OK (200).
if (res.getStatus().getCode() != 200) {
res.setContent(ChannelBuffers.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8));
setContentLength(res, res.getContent().readableBytes());
}
// Send the response and close the connection if necessary.
ChannelFuture f = ctx.getChannel().write(res);
if (!isKeepAlive(req) || res.getStatus().getCode() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
e.getChannel().close();
System.out.println("exceptionCaught=============");
}
private static String getWebSocketLocation(HttpRequest req) {
return "ws://" + req.getHeader(HttpHeaders.Names.HOST) + WEBSOCKET_PATH;
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
// super.channelConnected(ctx, e);
// channels.add(e.getChannel());
channelG.add(e.getChannel());
// channelG.addAll(channels);
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
super.channelDisconnected(ctx, e);
}
}/**
* Generates the demo HTML page which is served at http://localhost:8080/
页面(该页面写在代码的形式输出)
*/
public final class WebSocketServerIndexPage {
private static final String NEWLINE = "\r\n";
public static ChannelBuffer getContent(String webSocketLocation) {
return ChannelBuffers.copiedBuffer(
"<html><head><title>Web Socket Test</title></head>" + NEWLINE +
"<body>" + NEWLINE +
"<script type=\"text/javascript\">" + NEWLINE +
"var socket;" + NEWLINE +
"if (!window.WebSocket) {" + NEWLINE +
" window.WebSocket = window.MozWebSocket;" + NEWLINE +
"}" + NEWLINE +
"if (window.WebSocket) {" + NEWLINE +
" socket = new WebSocket(\"" + webSocketLocation + "\");" + NEWLINE +
" socket.onmessage = function(event) {" + NEWLINE +
" var ta = document.getElementById('responseText');" + NEWLINE +
" ta.value = ta.value + '\\n' + event.data" + NEWLINE +
" };" + NEWLINE +
" socket.onopen = function(event) {" + NEWLINE +
" var ta = document.getElementById('responseText');" + NEWLINE +
" ta.value = \"Web Socket opened!\";" + NEWLINE +
" };" + NEWLINE +
" socket.onclose = function(event) {" + NEWLINE +
" var ta = document.getElementById('responseText');" + NEWLINE +
" ta.value = ta.value + \"Web Socket closed\"; " + NEWLINE +
" };" + NEWLINE +
"} else {" + NEWLINE +
" alert(\"Your browser does not support Web Socket.\");" + NEWLINE +
"}" + NEWLINE +
NEWLINE +
"function send(message) {" + NEWLINE +
" if (!window.WebSocket) { return; }" + NEWLINE +
" if (socket.readyState == WebSocket.OPEN) {" + NEWLINE +
" socket.send(message);" + NEWLINE +
" } else {" + NEWLINE +
" alert(\"The socket is not open.\");" + NEWLINE +
" }" + NEWLINE +
"}" + NEWLINE +
"</script>" + NEWLINE +
"<form onsubmit=\"return false;\">" + NEWLINE +
"<input type=\"text\" name=\"message\" value=\"Hello, World!\"/>" +
"<input type=\"button\" value=\"Send Web Socket Data\"" + NEWLINE +
" onclick=\"send(this.form.message.value)\" />" + NEWLINE +
"<h3>Output</h3>" + NEWLINE +
"<textarea id=\"responseText\" style=\"width:500px;height:300px;\"></textarea>" + NEWLINE +
"</form>" + NEWLINE +
"</body>" + NEWLINE +
"</html>" + NEWLINE, CharsetUtil.US_ASCII);
}
private WebSocketServerIndexPage() {
// Unused
}
}/**
*/
public class WebSocketServerPipelineFactory implements ChannelPipelineFactory {
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline();
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("handler", new WebSocketServerHandler());
return pipeline;
}
}注:在推前两次请求时会报
java.lang.IllegalArgumentException: unsupported message type: class org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame
暂末找到原因,不过不影响程序的运行,跟进中......