Comet技术在项目中的使用

云服务器探讨 2012-08-14

Comet技术在项目中的使用

 

Comet是一种服务器端推的技术,所谓服务器端推也就是当有事件要通知给某个用户的时候,是由服务器端直接发送到用户的浏览器。服务器端Push目前一般有两种方式,HTTP streaming和Long polling。详细的介绍可以看这里 http://en.wikipedia.org/wiki/Push_technology

有一个Comet的框架叫做Cometd,使用的方式为Long polling。它是使用了jetty continuations特性,jetty continuations使得异步的request成为可能,这里我们来讨论下为何需要jetty continuations呢?

比如我们的浏览器的一个请求发送到服务器端了,并进行长轮询,保持了连接不结束,直到一次长轮询timeout或者有事件发生,并接收到服务端推来的消息,所以在一次长轮询的过程中,大部分时间都是在等待,如果使用老式同步的方式进行编程的话,那么有多少个连接就需要多少个线程在那里,而大都数都是在等待,所以这无疑是系统资源的巨大浪费。

jettycontinuations很好的解决了这一问题,当有请求过来之后,将连接的相关信息封装到一个continuation的对象中,通过调用continuation的suspend方法,然后返回,把当前线程交还到线程池,所以这个时候线程可以返回到线程池等待并处理其他新的请求。

当有事件要发给之前的某个请求的时候,再调用对应的continuation的resume方法,将原来的哪个请求重新发送到servelt进行处理,并将消息发送给客户端,然后客户端会重新进行一次长轮询。

Jetty是一个纯java实现的非常轻量级的web容器,高度组件化,可以很方便的将各种组件进行组装,而且可以非常容易的将jetty嵌入到自己的应用中。

jetty运行时的核心类是Server类,这个类的配置一般在jetty.xml中配置,然后jetty自带的一个简单的ioc容器将server加载初始化。

下图主要描述了Jetty在NIO的模式下工作的情形,这里只说到将任务分配到ThreadPool,后面的ThreadPool的处理没有说,大家可以去看下源码。

在jetty中,web容器启动是从Server开始的,一个Server可以对应多个Connector,从名字就可以知道,Connector是来处理外部连接的,Connector的实现有多种,即可以是非阻塞的(如SelectChannelConnector),也可以是阻塞的(如BlockingChannelConnector,当然jetty中这个阻塞的已经使用nio优化过,性能应该比使用javaio实现的好),

我们不能直接说谁的性能好,谁的性能不好,关键还是看应用场景,因为NIO实现的非阻塞的话,doSelect的过程是阻塞的。所以当并发量小,且请求可以快速得到响应的话,用阻塞的就可以很好的满足了,但是当并发量很大,且后端资源紧张,请求需要等待很长一段时间的(比如长轮询),那么NIO的性能肯定必传统的高很多很多倍。

这里稍微讲一下NIO的概念把,在NIO的Scoket通讯模型中,一个socket连接对应一个SocketChannel,SocketChannel可以将某个事件注册到某一个Selector上,然后对Selector进行select操作,当有请求来的时候,并可以通过Selector的selectedKeys()获得所有收到事件的channel,然后便可以对channel进行操作了。这个其实和linux中的select函数类似,只不过这里是面向对象的,在linux中,我们将需要监听的sockt连接加入到一个文件描述符的集合中FD_SET中,然后select函数对这个集合进行检测,根据得到的结果来判断某个fd对应的标志位是否为1来判断是否有数据。这样也就是一个线程可以同事处理多个连接。

换话题了,我们都知道请求最终都是在Servlet中被处理的,而Servlet得到的是request,response,这些对象什么时候出来的呢?不急,上面不是说到一个EndPoint(实现了Runnable接口)EndPoint对象在被初始化的时候就对其_connection成员进行了初始化,生成一个HttpConnection对象,newConnection的方法其实在SelectChannelConnector中被覆盖了。然后这个EndPoint对象不是被分配到ThreadPool了么,ThreadPool将其加入到队列中,当有空闲线程的时候,就对这个endPoint对象进行处理了,运行EndPoint的run方法,然后会调用自己的connection对象的handle方法,最终将connection对象交给Server的handler进行处理。Server本身继承自HandlerWrapper,自己的_handler是一个HandlerCollection的实例,HandlerCollection实例的配置在jetty.xml中有配置,在处理httpconnection对象的时候所配置的handler会依次被执行。DefaultHandler中就涉及到上下文处理,然后交给各个项目的servlet进行处理。

环境配置方法:

服务器端:

类库清单:WEB-INF/lib

jetty-6.1.9.jar

jetty-util-6.1.9.jar

servlet-api-2.5-6.1.9.jar

(以上Jetty服务器自带)

cometd-api-0.9.20080221.jar

cometd-bayeux-6.1.9.jar

web.xml配置:

     
    <!--  配置ContinuationCometdServlet, 这个是必须的。配置后就可以支持comted  --> 

<servlet>

<servlet-name>cometd</servlet-name>

<servlet-class>org.mortbay.cometd.continuation.ContinuationCometdServlet</servlet-class>

<!--对队列的内容进行过滤-->

<init-param>

<param-name>filters</param-name>

<param-value>/WEB-INF/filters.json</param-value>

</init-param>

<!--超时设置Theserversidepolltimeoutinmilliseconds(default250000).Thisishowlongtheserverwill

holdareconnectrequestbeforeresponding.-->

<init-param>

<param-name>timeout</param-name>

<param-value>120000</param-value>

</init-param>

<!--Theclientsidepolltimeoutinmilliseconds(default0).Howlongaclientwillwaitbetween

reconnects-->

<init-param>

<param-name>interval</param-name>

<param-value>0</param-value>

</init-param>

<!--theclientsidepolltimeoutifmultipleconnectionsaredetectedfromthesamebrowser

(default1500).-->

<init-param>

<param-name>multiFrameInterval</param-name>

<param-value>1500</param-value>

</init-param>

<!--0=none,1=info,2=debug-->

<init-param>

<param-name>logLevel</param-name>

<param-value>0</param-value>

</init-param>

<!--If"true"thentheserverwillacceptJSONwrappedinacommentandwillgenerateJSONwrapped

inacomment.ThisisadefenceagainstAjaxHijacking.-->

<init-param>

<param-name>JSONCommented</param-name>

<param-value>true</param-value>

</init-param>

<init-param>

<param-name>alwaysResumePoll</param-name>

<param-value>false</param-value><!--usetrueforx-sitecometd-->

</init-param>

<load-on-startup>1</load-on-startup>

</servlet>

<servlet-mapping>

<servlet-name>cometd</servlet-name>

<url-pattern>/cometd/*</url-pattern>

</servlet-mapping>

filters.json内容如下:

格式如下:

{

"channels":"/**",--要过滤的队列(支持通配符)

"filter":"org.mortbay.cometd.filter.NoMarkupFilter",--使用的过滤器,实现接口dojox.cometd.DataFilter

"init":{}--初始化的值,调用DataFilter.init方法传入

}

示例内容如下:

[

{

"channels":"/**",

"filter":"org.mortbay.cometd.filter.NoMarkupFilter",

"init":{}

},

{

"channels":"/chat/*",

"filter":"org.mortbay.cometd.filter.RegexFilter",

"init":[

["[fF].ck","dang"],

["teh","the"]

]

},

{

"channels":"/chat/**",

"filter":"org.mortbay.cometd.filter.RegexFilter",

"init":[

["[Mm]icrosoft","Micro\\$oft"],

[".*tomcat.*",null]

]

}

]

这时,服务器端的配置就已经完成的,基本的cometd功能就可以使用了。

客户端通过dojox.cometd.init("http://127.0.0.2:8080/cometd");就可以进行连接。

代码开发:

接下来,我们要准备客户端(使用dojo来实现)

一共三个文件

index.html

chat.js

chat.css(不是必须)

下面来看一下这两个文件的内容(加入注释)

index.html

< html > 

<head>

<title>Cometdchat</title>

<scripttype="text/javascript"src="../dojo/dojo/dojo.js"></script><!--dojo类库-->

<scripttype="text/javascript"src="../dojo/dojox/cometd.js.uncompressed.js"></script><!--dojo-cometd类库-->

<scripttype="text/javascript"src="chat.js"></script><!--chatjs文件,控制cometd的连接,消息的发送与接收-->

<linkrel="stylesheet"type="text/css"href="chat.css">

</head>

<body>

<h1>CometdChat</h1>

<divid="chatroom">

<divid="chat"></div>

<divid="input">

<divid="join"><!--未登录时,显示的登录名和登录按钮-->

Username:&nbsp;<inputid="username"type="text"/>

<inputid="joinB"class="button"type="submit"name="join"value="Join"/>

</div>

<divid="joined"class="hidden"><!--登录后,显示的消息框和发送,退出按钮(默认为隐藏)-->

Chat:&nbsp;<inputid="phrase"type="text"></input>

<inputid="sendB"class="button"type="submit"name="join"value="Send"/>

<inputid="leaveB"class="button"type="submit"name="join"value="Leave"/>

</div>

</div>

</div>

</ body >

chat.js文件

  1  // 引入所需要的类 

2dojo.require("dojox.cometd");

3dojo.require("dojox.cometd.timestamp");

4

5//定义一个room类

6varroom={

7//定义属性

8_last:"",//最后发送消息的人员(如果不是本人,则显示为空)

9_username:null,//当前的用户名

10_connected:true,//当前的连接状态true已经连接,false表示未连接

11groupName:"whimsical",//组名(未知)

12

13//登录操作

14join:function(name){

15

16if(name==null||name.length==0){

17alert('Pleaseenterausername!');

18}else{

19

20dojox.cometd.init(

newString(document.location).replace(/http:\/\/[^\/]*/,'').replace(/\/examples\/.*$/,'')+"/cometd");

21//dojox.cometd.init("http://127.0.0.2:8080/cometd");

22this._connected=true;

23

24this._username=name;

25dojo.byId('join').className='hidden';

26dojo.byId('joined').className='';

27dojo.byId('phrase').focus();

28

29//subscribeandjoin

30dojox.cometd.startBatch();

31dojox.cometd.subscribe("/chat/demo",room,"_chat",{groupName:this.groupName});

32dojox.cometd.publish("/chat/demo",{

33user:room._username,

34join:true,

35chat:room._username+"hasjoined"

36},{groupName:this.groupName});

37dojox.cometd.endBatch();

38

39//handlecometdfailureswhileintheroom

40room._meta=dojo.subscribe("/cometd/meta",this,function(event){

41console.debug(event);

42if(event.action=="handshake"){

43room._chat({data:{

44join:true,

45user:"SERVER",

46chat:"reinitialized"

47}});

48dojox.cometd.subscribe("/chat/demo",room,"_chat",{groupName:this.groupName});

49}elseif(event.action=="connect"){

50if(event.successful&&!this._connected){

51room._chat({data:{

52leave:true,

53user:"SERVER",

54chat:"reconnected!"

55}});

56}

57if(!event.successful&&this._connected){

58room._chat({data:{

59leave:true,

60user:"SERVER",

61chat:"disconnected!"

62}});

63}

64this._connected=event.successful;

65}

66},{groupName:this.groupName});

67}

68},

69

70//离开操作

71leave:function(){

72if(!room._username){

73return;

74}

75

76if(room._meta){

77dojo.unsubscribe(room._meta,null,null,{groupName:this.groupName});

78}

79room._meta=null;

80

81dojox.cometd.startBatch();

82dojox.cometd.unsubscribe("/chat/demo",room,"_chat",{groupName:this.groupName});

83dojox.cometd.publish("/chat/demo",{

84user:room._username,

85leave:true,

86chat:room._username+"hasleft"

87},{groupName:this.groupName});

88dojox.cometd.endBatch();

89

90//switchtheinputform

91dojo.byId('join').className='';

92dojo.byId('joined').className='hidden';

93dojo.byId('username').focus();

94room._username=null;

95dojox.cometd.disconnect();

96},

97

98//发送消息

99chat:function(text){

100if(!text||!text.length){

101returnfalse;

102}

103dojox.cometd.publish("/chat/demo",{user:room._username,chat:text},{groupName:this.groupName});

104},

105

106//从服务器收到消息后,回调的方法

107_chat:function(message){

108varchat=dojo.byId('chat');

109if(!message.data){

110console.debug("badmessageformat"+message);

111return;

112}

113varfrom=message.data.user;

114varspecial=message.data.join||message.data.leave;

115vartext=message.data.chat;

116if(!text){return;}

117

118if(!special&&from==room._last){

119from="";

120}else{

121room._last=from;

122from+=":";

123}

124

125if(special){

126chat.innerHTML+="<spanclass=\"alert\"><spanclass=\"from\">"+from+"&nbsp;

</span><spanclass=\"text\">"+text+"</span></span><br/>";

127room._last="";

128}else{

129chat.innerHTML+="<spanclass=\"from\">"+from+"&nbsp;</span><spanclass=\"text\">"+text+"</span><br/>";

130}

131chat.scrollTop=chat.scrollHeight-chat.clientHeight;

132},

133

134//初始操作

135_init:function(){

136dojo.byId('join').className='';

137dojo.byId('joined').className='hidden';

138dojo.byId('username').focus();

139

140varelement=dojo.byId('username');

141element.setAttribute("autocomplete","OFF");

142dojo.connect(element,"onkeyup",function(e){//支持回车,登录

143if(e.keyCode==dojo.keys.ENTER){

144room.join(dojo.byId('username').value);

145returnfalse;

146}

147returntrue;

148});

149

150dojo.connect(dojo.byId('joinB'),"onclick",function(e){//绑定room.join方法到Join按扭

151room.join(dojo.byId('username').value);

152e.preventDefault();

153});

154

155element=dojo.byId('phrase');//取得消息框

156element.setAttribute("autocomplete","OFF");

157dojo.connect(element,"onkeyup",function(e){//支持回车发送消息

158if(e.keyCode==dojo.keys.ENTER){

159room.chat(dojo.byId('phrase').value);

160dojo.byId('phrase').value='';

161e.preventDefault();

162}

163});

164

165dojo.connect(dojo.byId('sendB'),"onclick",function(e){//绑定room.chat方法到sendB按扭

166room.chat(dojo.byId('phrase').value);

167dojo.byId('phrase').value='';

168});

169dojo.connect(dojo.byId('leaveB'),"onclick",room,"leave");//绑定room.leave方法到leaveB按扭

170}

171};

172

173//页面装载时,调用room._init方法

174dojo.addOnLoad(room,"_init");

175//页面关闭时,调用room.leave方法

176dojo.addOnUnload(room,"leave");

177

178  // vim:ts=4:noet:

补充:服务器端如何监控消息队列,以及进行订阅,发送消息操作

要进行监控消息队列,以及进行订阅,发送消息操作的关键就是取得Bayeux接口实现类的实例

可以通过ServletContextAttributeListener这个监听器接口,通过attributeAdded方式加入

实现方法如下:

 1  public   class  BayeuxStartupListener  implements  ServletContextAttributeListener

2{

3publicvoidinitialize(Bayeuxbayeux)

4{

5synchronized(bayeux)

6{

7if(!bayeux.hasChannel("/service/echo"))

8{

9//取得bayeux实例

10}

11}

12}

13

14publicvoidattributeAdded(ServletContextAttributeEventscab)

15{

16if(scab.getName().equals(Bayeux.DOJOX_COMETD_BAYEUX))

17{

18Bayeuxbayeux=(Bayeux)scab.getValue();

19initialize(bayeux);

20}

21}

22

23publicvoidattributeRemoved(ServletContextAttributeEventscab)

24{

25

26}

27

28publicvoidattributeReplaced(ServletContextAttributeEventscab)

29{

30

31}

32  }

取到Bayeux实例后,就可以借助BayeuxService类帮我们实现消息队列的监听,订阅消息以及发送消息

 1       public   void  initialize(Bayeux bayeux)

2{

3synchronized(bayeux)

4{

5if(!bayeux.hasChannel("/service/echo"))

6{

7//取得bayeux实例

8newChatService(bayeux);

9}

10}

11      }

具体方法请看下面这段代码:

 1  // 定义 ChatService类,继承 BayeuxService 

2publicstaticclassChatServiceextendsBayeuxService{

3

4ConcurrentMap<String,Set<String>>_members=newConcurrentHashMap<String,Set<String>>();

5

6publicChatService(Bayeuxbayeux)

7{

8super(bayeux,"chat");//必须,把Bayeux传入到BayeuxService对象中

9subscribe("/chat/**","trackMembers");//订阅队列,收到消息后,会回调trackMembers方法

10/*

11subscribe支持回调的方法如下:

12#myMethod(ClientfromClient,Objectdata)

13#myMethod(ClientfromClient,Objectdata,Stringid)

14#myMethod(ClientfromClient,Stringchannel,Objectdata,Stringid)

15#myMethod(ClientfromClient,Messagemessage)

16

17参数:

18ClientfromClient发送消息的客户端

19Objectdata消息内容

20idTheidofthemessage

21channel队列名称

22Messagemessage消息对象。继承于Map

23

24*/

25}

26

27//发布消息到队列

28publicvoidsendMessage(Stringmessage){

29Map<String,Object>mydata=newHashMap<String,Object>();

30mydata.put("chat",message);

31

32Clientsender=getBayeux().newClient("server");

33

34getBayeux().getChannel("/chat/demo",false).publish(sender,mydata,"0"/*null*/);

35

36}

37

38//发送消息给指定的client(非广播方式)

39publicvoidsendMessageToClient(Clientjoiner,Stringmessage){

40Map<String,Object>mydata=newHashMap<String,Object>();

41mydata.put("chat",message);

42

43send(joiner,"/chat/demo",mydata,"0"/*null*/);

44}

45

46//订阅消息回调方法

47publicvoidtrackMembers(Clientjoiner,Stringchannel,Map<String,Object>data,Stringid)

48{

49//解释消息内容,如果消息内容中有join这个字段且值为true

50if(Boolean.TRUE.equals(data.get("join")))

51{

52//根据队列,取得当前登录的人员

53Set<String>m=_members.get(channel);

54if(m==null)

55{

56//如果为空,则创建一个新的Set实现

57Set<String>new_list=newCopyOnWriteArraySet<String>();

58m=_members.putIfAbsent(channel,new_list);

59if(m==null)

60m=new_list;

61}

62

63finalSet<String>members=m;

64finalStringusername=(String)data.get("user");

65

66members.add(username);

67//为该client增加事件,Remove事件。当用户退出时,触发该方法。

68joiner.addListener(newRemoveListener(){

69publicvoidremoved(StringclientId,booleantimeout)

70{

71members.remove(username);

72}

73});

74

75//为该client增加事件,消息的发送和接收事件。当用户退出时,触发该方法。

76joiner.addListener(newMessageListener(){

77publicvoiddeliver(ClientfromClient,ClienttoClient,Messagemessage){

78System.out.println("messagefrom"+fromClient.getId()+"to"

79+toClient.getId()+"messageis"+message.getData());

80}

81});

82

83Map<String,Object>mydata=newHashMap<String,Object>();

84mydata.put("chat","members="+members);

85//把已经登录的人员信息列表,发送回给消息发送者

86send(joiner,channel,mydata,id);

87

88}

89}

90}

91

相关推荐