spring整合JMS

胡献根 2020-02-23

JMS(java消息服务)

JMS是java的一个标准,定义了使用消息代理的通用API。

Spring基于模板类JmsTemplate为JMS提供了支持

Spring还提供了消息驱动POJO的理念:这是一个简单的Java对象,它能够以异步的方式响应队列或主题上到达的消息

消息代理(message broker)和目的地(desttination)

日常收寄快递,会涉及三个对象:

  • 寄件人
  • 目的地
  • 收件人

假设没有快递企业,寄件人必须亲自跋涉千里把物品送到目的地,然后由收件人去领取。

在异步消息中,消息代理充当快递企业的角色。为消息代理指定目的地后,消息代理确保消息送达,同时解放了发送者,使其可以进行其他业务

目的地只关注消息从哪里获取,不关心由谁取走消息。它有两种同用的目的地类型:

  • 队列
  • 主题

搭建消息代理——ActiveMQ

1、配置依赖

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>${spring.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-spring</artifactId>
    <version>5.15.11</version>
</dependency>

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>

2、配置连接工厂

首先配置JMS连接工厂,让应用知道如何连接到ActiveMQ。

ActiveMQConnectionFactory是ActiveMQ自带的连接工厂,在Spring中配置如下:

<bean id="connectionFactory" 
    class="org.apache.activemq.ActiveMQConnectionFactory"
    p:brokerURL="tcp://localhost:61616" />

ActiveMQ代理监听默认监听localhost的61616端口,可以使用brokerURL属性来指定端口

3、声明ActiveMQ消息的目的地

如果目的地是队列类型:

<bean id="queue" class="org.apache.activemq.command.ActiveMQQueue"
    c:_0="queue" />

ActiveMQTopic(String name)构造器会创建一个队列类型的目的地,并用String对象指明队列的名字

如果目的地是队列类型:

<bean id="topic" class="org.apache.activemq.command.ActiveMQTopic"
    c:_0="topic" />

4、使用amq命名空间简化配置

<amq:queue id="spittleQueue" physicalName="spittle.alert.queue" />

<amq:topic id="spittleTopic" physicalName="spittle.alert.topic" />

<amq:connectionFactory id="connectionFactory" 
      brokerURL="tcp://localhost:61616" />

发送和接受消息

如果不适用JmsTemplate模板,发送消息需要如下步骤:

  1. 建立ConnectionFactory工厂对象,需要填入用户名、密码、连接地址(一般使用默认,如果没有修改的话)
  2. 通过ConnectionFactory对象创建一个Connection连接,并且调用Connection的start方法开启连接,Connection方法默认是关闭的
  3. 通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数1是是否启用事物,参数2是签收模式,一般设置为自动签收
  4. 通过Session对象创建Destination对象(目的地),指的是一个客户端用来制定生产消息目标和消费消息来源的对象。在PTP的模式中,Destination被称作队列,在Pub/Sub模式中,Destination被称作主题(Topic)
  5. 通过Session对象创建消息的发送和接收对象(生产者和消费者)
  6. 通过MessageProducer的setDeliverMode方法为其设置持久化或者非持久化特性
  7. 使用JMS规范的TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据。客户端同理。记得关闭
public static void main(String[] args) throws JMSException {

    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
            ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://94.191.49.192:61616");
            
    Connection connection = connectionFactory.createConnection();
    connection.start();
    
    Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
    
    Destination destination = session.createQueue("queue");
    
    MessageProducer producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    for (int i=0;i<=5;i++) {
        TextMessage textMessage = session.createTextMessage();
        textMessage.setText("我是第"+i+"消息");
        producer.send(textMessage);
    }
    if(connection!=null){
        connection.close();
    }
}

通过JmsTemplate模板可以简化上述代码

1、配置JmsTemplate

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
    c:_0-ref="connectionFactory"
    
    <!--可选属性, 指定默认目的地 -->
    p:defaultDestination-ref="queue" />

如果系统存在目的地,可以通过p:defaultDestination-ref将目的地bean装配进来;

此外,也可以用defaultDestinationName属性指明消息通道名称,如果命名目的地存在就使用已有的,否则创建该命名目的地.

它的messageConverter属性可以指定消息转换器,JmsTemplate在convertAndSend()方法中默认使用SimpleMessage Converter

2、发送和接受消息

import org.springframework.jms.core.JmsOperations;

public class AlertServiceImpl implements AlertService {

  private JmsOperations jmsOperations;

  public AlertServiceImpl(JmsOperations jmsOperations) {
    this.jmsOperations = jmsOperations;
  }

//  使用send方法发送消息
//  public void sendSpittleAlert(final Spittle spittle) {
//    jmsOperations.send(
//      "spittle.alert.queue", 
//      new MessageCreator() {
//        public Message createMessage(Session session) 
//                       throws JMSException {
//          return session.createObjectMessage(spittle);
//        }
//      }
//    );
//  }

//  使用convertAndSend方法发送消息
  public void sendSpittleAlert(Spittle spittle) {
    jmsOperations.convertAndSend(spittle);
  }

//  使用receive方法接收消息 
//  public Spittle getSpittleAlert() {
//    try {
//    ObjectMessage message = (ObjectMessage) jmsOperations.receive();
//    return (Spittle) message.getObject();
//    } catch (JMSException e) {
//      throw JmsUtils.convertJmsAccessException(e);
//    }
//  }

//  使用receiveAndConvert方法接收消息  
  public Spittle retrieveSpittleAlert() {
    return (Spittle) jmsOperations.receiveAndConvert();
  }
}

convertAndSend/receiveAndConvert会使用内置的消息转换器(message converter)创建消息。这两个方法在接收Object对象时,Object对象必须实现Serializable接口而且有默认构造器

使用JmsTemplate接收消息的最大缺点在于receive()和receiveAndConvert()方法都是同步的

创建消息驱动的POJO

消息驱动(MDB)来源于EJB2规范。MDB可以异步处理消息,MDB将JMS目的地中的消息作为事件,并对这些事件进行响应

MDB需要实现javax.jms.MessageListener接口的onMessage(message message)f的方法,并使用@MessageDriven(mappedname="destination")注解标注MDB

Spring提供了以POJO的方式处理消息的能力,不需要实现javax.jms.MessageListener接口

只需要把处理消息的POJO配置为消息监听器,这个POJO便能接受消息

<jms:listener-container connection-factory="connectionFactory">
    <jms:listener destination="destination"
        <!-- messageHandler是自定义的POJO消息处理器 -->
        ref="messageHandler"
        
        <!-- messageHandler是自定义的POJO消息处理器的处理方法 -->
        method="handler" />
</jms:listener-container>

消息监听器容器(message listener container)是一个特殊的bean,它可以监控JMS目的地并等待消息到达。一旦有消息到达,它取出消息,然后把消息传给任意一个对此消息感兴趣的消息监听器

connection-factory属性配置了连接工厂,容器中的每个jms:listener都使用这个连接工厂进行消息监听,connection-factory属性默认值为connectionFactory

对于jms:listener元素如果ref属性所标示的bean实现了MessageListener,那就没有必要再指定 method 属性了,默认就会调用onMessage()方法

参考

消息中间件ActiveMQ使用详解
浅谈ActiveMQ与使用

相关推荐

wuddny的blog / 0评论 2012-11-15