Jolestar 2012-06-11
——使用spring与activemq
第一次写技术文章,所以不懂规矩,写得也没什么条理,主要都是根据自己的一些实际操作经验写的,
以下代码都经过实际验证,其中也有自己不甚明了之处,欢迎大家拍砖或者交流。
关于jms,我打算会有一系列的文章,这里只是其中一节,
本节主要介绍如何在spring框架下配置使用jms,jms提供者使用activemq。
该配置相关说明:
1.消息发送者与接受者分离,分别配置在两个独立的配置文件中
2.启用一个消息服务器,其中产生一个queue一个topic,两个生产者分别往其中发送对象,发送时采用spring提供的转换器,可以实现java对象与jms消息的相互转化。
3.消费者总共三个,一个消费queue中的消息,两个消费topic中的消息,我们称之为订阅者,其中一个订阅者是持久性订阅者,我们知道对于普通的订阅者来说,当该订阅者处于非活动期时topic中产生的消息是无法再传送给订阅者的(除非是实体化消息),但是持久化订阅者是可以收到其在非活动期间topic中产生的消息的(从其在服务器上注册时开始至现在)
4.发送时使用的是同步发送方式,即发送者发送消息到服务器,等待服务器发送确认消息表示发送成功,发送者可以继续发送消息,消费者或者订阅者使用的是监听器方式,所以是采用异步接收方式,即接受者不需要一直阻塞直到接收消息,而是jms服务器有消息到达时会触发消息监听器的一个动作。当我们需要使用同步接收方式时就需要像同步发送消息一样使用jmstemplate。同时这里消息的确认方式采用默认的AUTO_ACKNOWLEDGE方式,即自动确认,即服务器收到消息立马发送确认消息,同样接收者收到消息会立即向服务器发送确认消息。需要注意的是,这里的同步异步,都只是描述客户端与服务器端之间的关系,而不是这发送者-服务器-接受者三者间的关系。
下面我们看具体的配置
activemq-produce.xml
<beansxmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://activemq.apache.org/schema/corehttp://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd">
<!--<context:property-placeholderlocation="classpath:server-config.properties"ignore-unresolvable="true"></context:property-placeholder>
-->
<!--指定发送端连接的activeMQ服务器-->
<beanid="connectionFactory"class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<propertyname="brokerURL"value="tcp://localhost:61616"/>
</bean>
<!--指定发送的目的地的类型和名字-->
<beanid="MyQueue"class="org.apache.activemq.command.ActiveMQQueue">
<constructor-argvalue="firstQueue"></constructor-arg>
</bean>
<beanid="Topic-A"class="org.apache.activemq.command.ActiveMQTopic">
<constructor-argvalue="Topic-A"></constructor-arg>
</bean>
<!--converter-->
<beanid="defaultMessageConverter"class="com.gy.myactivemq.DefaultMessageConverter"/>
<beanid="simpleMessageConverter"class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
<!--SpringJmsTemplateconfig,即消息发送模板-->
<beanid="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">
<propertyname="connectionFactory">
<!--letswrapinapooltoavoidcreatingaconnectionpersend-->
<!--单例模式,避免每次发送消息时新产生一个连接-->
<beanclass="org.springframework.jms.connection.SingleConnectionFactory">
<propertyname="targetConnectionFactory"ref="connectionFactory"/>
</bean>
</property>
<!--customMessageConverter-->
<propertyname="messageConverter"ref="simpleMessageConverter"/>
</bean>
<!--POJOwhichsendMessageusesSpringJmsTemplate,使用消息模板发送消息-->
<beanid="queueMessageProducer"class="com.gy.myactivemq.QueueMessageProducer">
<propertyname="template"ref="jmsTemplate"/>
<propertyname="destination"ref="MyQueue"/>
</bean>
<beanid="topicMessageProducer"class="com.gy.myactivemq.TopicMessageProducer">
<propertyname="template"ref="jmsTemplate"/>
<propertyname="destination"ref="Topic-A"/>
</bean>
</beans>
其中com.gy.myactivemq.QueueMessageProducer的java代码如下:
publicclassQueueMessageProducer{
privateJmsTemplatetemplate;
privateQueuedestination;
publicvoidsetTemplate(JmsTemplatetemplate){
this.template=template;
}
publicvoidsetDestination(Queuedestination){
this.destination=destination;
}
publicvoidsend(FooMessagemessage){
template.convertAndSend(this.destination,message);
}
publicvoidsendByMe(Stringmess){
template.convertAndSend(this.destination,mess);
}
}
其中包括两个可以用来发送消息的方法sendByMe和send,可以分别用来发送不同类型的消息,如果想要发送其他类型的消息可以自己定于,然后在单元测试中调用方式如下:
@Test
publicvoidsendTest(){
ApplicationContextctx=newClassPathXmlApplicationContext("activemq-produce.xml");
QueueMessageProducerqmp=(QueueMessageProducer)ctx.getBean("queueMessageProducer");
TopicMessageProducertmp=(TopicMessageProducer)ctx.getBean("topicMessageProducer");
FooMessagemessage=newFooMessage();
message.setId(123);
qmp.send(message);
tmp.send(message);
}
activemq-consumer.xml
<beansxmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://activemq.apache.org/schema/corehttp://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
<!--<context:property-placeholderlocation="classpath:server-config.properties"ignore-unresolvable="true"></context:property-placeholder>
-->
<!--用于连接activeMQ服务器-->
<beanid="connectionFactory"class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<propertyname="brokerURL"value="tcp://localhost:61616"/>
<propertyname="clientIDPrefix"value="www"/>
</bean>
<!--ActiveMQdestinations,连接的目标名称-->
<beanid="MyQueue"class="org.apache.activemq.command.ActiveMQQueue">
<constructor-argvalue="firstQueue"></constructor-arg>
</bean>
<beanid="Topic-A"class="org.apache.activemq.command.ActiveMQTopic">
<constructor-argvalue="Topic-A"></constructor-arg>
</bean>
<beanid="simpleMessageConverter"class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
<beanid="queueListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-argref="queueConsumer"/>
<propertyname="defaultListenerMethod"value="receive"/>
<propertyname="messageConverter"ref="simpleMessageConverter"/>
</bean>
<beanid="topicListenerA"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-argref="topicConsumerA"/>
<propertyname="defaultListenerMethod"value="receive"/>
<propertyname="messageConverter"ref="simpleMessageConverter"/>
</bean>
<beanid="topicListenerB"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-argref="topicConsumerB"/>
<propertyname="defaultListenerMethod"value="receive"/>
<propertyname="messageConverter"ref="simpleMessageConverter"/>
</bean>
<beanid="queueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<propertyname="connectionFactory"ref="connectionFactory"/>
<propertyname="concurrentConsumers"value="1"/>
<propertyname="destination"ref="MyQueue"/>
<propertyname="messageListener"ref="queueListener"/>
</bean>
<beanid="topicListenerContainerA"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<propertyname="connectionFactory"ref="connectionFactory"/>
<propertyname="concurrentConsumers"value="1"/>
<propertyname="destination"ref="Topic-A"/>
<propertyname="messageListener"ref="topicListenerA"/>
<propertyname="subscriptionDurable"value="true"/>
<propertyname="clientId"value="clientId_001"/>
<propertyname="durableSubscriptionName"value="clientId_001"/>
</bean>
<beanid="topicListenerContainerB"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<propertyname="connectionFactory"ref="connectionFactory"/>
<propertyname="concurrentConsumers"value="1"/>
<propertyname="destination"ref="Topic-A"/>
<propertyname="messageListener"ref="topicListenerB"/>
</bean>
<!--consumerforqueue-->
<beanid="queueConsumer"class="com.gy.myactivemq.QueueConsumer"/>
<!--consumerfortopic-->
<beanid="topicConsumerA"class="com.gy.myactivemq.TopicConsumerA"/>
<beanid="topicConsumerB"class="com.gy.myactivemq.TopicConsumerB"/>
</beans>
需要注意的是这段配置:
<beanid="topicListenerContainerA"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<propertyname="connectionFactory"ref="connectionFactory"/>
<propertyname="concurrentConsumers"value="1"/>
<propertyname="destination"ref="Topic-A"/>
<propertyname="messageListener"ref="topicListenerA"/>
<propertyname="subscriptionDurable"value="true"/>
<propertyname="clientId"value="clientId_001"/>
<propertyname="durableSubscriptionName"value="clientId_001"/>
</bean>
其中subscriptionDurable属性表明该处于该container中的订阅者是一个持久订阅者,配置持久订阅者必须指定一个clientId的值,而且这个值对于每一个订阅者都必须是唯一的,因为jms服务器要根据每个订阅者的这个clientId为其注册,这样jms服务器才能确保在该订阅者不活动时为其保存消息。还注意到如下配置:
<beanid="connectionFactory"class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<propertyname="brokerURL"value="tcp://localhost:61616"/>
<propertyname="clientIDPrefix"value="www"/>
</bean>
其中<propertyname="clientIDPrefix"value="www"/>配置指明每个连接到jms服务器的连接的clientId的前缀是www,然后具体这个配置的用处现在还不清楚,之前网上说这个配置表明只有当连接的jms服务器的客户端的clientId名的前缀与这个配置一致时,该连接才能实现持久化订阅,但实际情况是这里不管要不要这个配置,订阅者只要指定任意的clientId值都可以实现持久订阅。
对于这段配置我们可以发现对于每一个listener监听者我们都需要配置一个container容器,这里我们可以通过jms标签简化这个配置,比如若有两个listener,topicListenerA和topicListenerB配置如下:
<jms:listener-containerconnection-factory="connectionFactory"concurrency="1"destination-type="topic">
<jms:listenerdestination="Topic-A"ref="topicListenerA"/>
<jms:listenerdestination="Topic-A"ref="topicListenerB"/>
</jms:listener-container>
其中topicListenerA和topicListenerB是在前面的配置文件中已经配好,当然我们也可以直接在<jms:listenerdestination="Topic-A"ref="topicListenerA"/>标签中配置,这样下面这段代码也可以省了
<beanid="topicListenerA"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-argref="topicConsumerA"/>
<propertyname="defaultListenerMethod"value="receive"/>
<propertyname="messageConverter"ref="simpleMessageConverter"/>
</bean>
具体配置可以参阅《java消息服务》一书。
使用jms标签需要加入如下命名空间:
<beansxmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://activemq.apache.org/schema/corehttp://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
总之,这样就省去了一个listener需要一个container的麻烦,如果需要配置持久订阅者则需要如下配置:
<!--弊端:一个listener-container若指定了client-id和destination-type来实现持久化订阅,则只能含有一个listener,因为每个listener的client-id必须唯一-->
<jms:listener-containerconnection-factory="connectionFactory"concurrency="1"client-id="clientId_001"destination-type="durableTopic">
<jms:listenerdestination="Topic-A"ref="topicListenerA"/>
</jms:listener-container>
这么做的弊端已经在注释上标明,当然也许不是这样,但是目前个人没有发现如何将持久订阅者和非持久订阅者放在同一个container中。另外destination-type属性用于指定监听的消息提供者类型,默认是queue,所以监听topic时需要指定类型。
开启消息监听的代码很简单:
publicstaticvoidmain(String[]agr){
ApplicationContextctx=newClassPathXmlApplicationContext("activemq-consumer.xml");
}
注意这里不使用单元测试的@Test是因为,使用单元测试的方法运行之后马上会停止该方法,导致连接之后马上会断开,消息可能都还没有被监听者收到,所以我们需要保持监听者一直与jms服务器连接。
这里,需要用的jar包有三个必须的,分别是spring-jms,activemq-all,slf4j相关的包(slf4j-simple,slf4j-spi),当然spring读取配置文件的相关包肯定也是需要的,用过spring的应该都知道。
另外,activemq的启动是单独启动的,也可以通过如下方式在spring配置中启动:
<amq:brokeruseJmx="false"persistent="true">
<amq:persistenceAdapter>
<amq:amqPersistenceAdapterdirectory="d:/amq"/>
</amq:persistenceAdapter>
<amq:transportConnectors>
<amq:transportConnectoruri="tcp://localhost:61616"/>
</amq:transportConnectors>
</amq:broker>
但是不推荐这种方式,建议分开来启动。版权所有,转载请注明来源: