Spring-ActiveMQ的点对点和Topic

yanghuashuiyue 2013-09-29

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.example.activemq</groupId>
  <artifactId>activemq-test</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>activemq-test</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <org.springframework.version>3.1.1.RELEASE</org.springframework.version>
  </properties>
  
<repositories>
  <repository>
	<id>kxcomm-maven</id>
	<name>Maven kxcomm Repository</name>
	<url>http://122.13.0.56:8088/nexus/content/groups/public/</url>
	<releases>
		<enabled>true</enabled>
	</releases>
	<snapshots>
		<enabled>true</enabled>
	</snapshots>
</repository>   
</repositories>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
			<groupId>commons-collections</groupId>
			<artifactId>commons-collections</artifactId>
			<version>3.2</version>
		</dependency>
		<dependency>
			<groupId>commons-configuration</groupId>
			<artifactId>commons-configuration</artifactId>
			<version>1.6</version>
		</dependency>
		<dependency>
			<groupId>commons-io</groupId>
			<artifactId>commons-io</artifactId>
			<version>1.3.2</version>
		</dependency>
		<dependency>
			<groupId>commons-logging</groupId>
			<artifactId>commons-logging</artifactId>
			<version>1.1.1</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
		</dependency>
		<dependency>
			<groupId>commons-beanutils</groupId>
			<artifactId>commons-beanutils</artifactId>
			<version>1.8.3</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-asm</artifactId>
			<version>${org.springframework.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>${org.springframework.version}</version>
		</dependency>
		<dependency>
			<groupId>com.davidkarlsen.commonstransaction.spring</groupId>
			<artifactId>commons-transaction-spring</artifactId>
			<version>0.9</version>
		</dependency>
		<dependency>
			<groupId>org.apache.httpcomponents</groupId>
			<artifactId>httpclient</artifactId>
			<version>4.2-beta1</version>
		</dependency>
		<dependency>
			<groupId>fastutil</groupId>
			<artifactId>fastutil</artifactId>
			<version>5.0.9</version>
		</dependency>

		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>0.9.27</version>
			<scope>compile</scope>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-core</artifactId>
			<version>0.9.27</version>
			<scope>compile</scope>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.6.1</version>
			<scope>compile</scope>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>jcl-over-slf4j</artifactId>
			<version>1.6.1</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.7.5</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>5.8.0</version>
		</dependency>
  </dependencies>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<beans
    xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:aop="http://www.springframework.org/schema/aop" 
    xmlns:tx="http://www.springframework.org/schema/tx" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xsi:schemaLocation=" 
                http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
                http://www.springframework.org/schema/aop
                http://www.springframework.org/schema/aop/spring-aop-2.5.xsd
                http://www.springframework.org/schema/tx
                http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
                http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context-2.5.xsd"
                 default-autowire="byName" default-lazy-init="true">

<import resource="activemq-test.xml"/>

</beans>
<?xml version="1.0" encoding="UTF-8"?>

<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
  http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

	<!-- 创建工厂连接 -->
	<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://localhost:61616" />
	</bean>
	
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory" />
		 <property name="defaultDestination" ref="rantzDestination" />  
	</bean>
	
	<!-- Point-to-Point -->
	<!-- activeMQ消息目标 队列 -->
	<bean id="rantzDestination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="rantz.marketing.queue"></constructor-arg>
	</bean>
	
	<!-- activeMQ消息目标 主题-->
        <!--	<bean id="rantzDestination" class="org.apache.activemq.command.ActiveMQTopic">-->
        <!--		<constructor-arg index="0" value="rantz.marketing.queue"></constructor-arg>-->
        <!--	</bean>-->
        
	<bean id="producer" class="activemq.test.p2p.producer.RantzMarketingGatewayImpl">
		<property name="jmsTemplate" ref="jmsTemplate" />
		<property name="destination" ref="rantzDestination" />
	</bean>
	<bean id="consumer" class="activemq.test.p2p.consumer.MarketingReceiverGatewayImpl">  
	    <property name="jmsTemplate" ref="jmsTemplate" />  
	</bean>  
	<!-- Point-to-Point End-->
	
	
	<!-- Topic -->
	<bean id="topic" class="org.apache.activemq.command.ActiveMQTopic" autowire="constructor">
        <constructor-arg index="0" value="kxcomm.mms.topic" />
    </bean>
    <bean id="control" class="org.apache.activemq.command.ActiveMQTopic" autowire="constructor">
        <constructor-arg index="0" value="kxcomm.mms.control" />
    </bean>
	
	<bean id="myListener" class="activemq.test.topic.MyListener">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="topic" ref="topic" />
    <property name="control" ref="control" />
    </bean>
   
   <bean id="myPublisher" class="activemq.test.topic.MyPublisher">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="topic" ref="topic" />
    <property name="control" ref="control" />
    </bean>
	<!-- Topic End-->

</beans>
package activemq.test.model;

import java.io.Serializable;

public class User implements Serializable{
	private static final long serialVersionUID = -3098636047897519268L;
	private String name;
	private String sex;
	private int age;
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public String getSex() {
		return sex;
	}
	public void setSex(String sex) {
		this.sex = sex;
	}
	public int getAge() {
		return age;
	}
	public void setAge(int age) {
		this.age = age;
	}
	@Override
	public String toString() {
		return "User [name=" + name + ", sex=" + sex + ", age=" + age + "]";
	}
	
}

PTP模型

PTP(Point-to-Point)模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和邮件系统中的邮箱一样,队列可以包含各种消息,JMSProvider提供工具管理队列的创建、删除。JMSPTP模型定义了客户端如何向队列发送消息,从队列接收消息,浏览队列中的消息。

package activemq.test.p2p.consumer;

import org.springframework.jms.core.JmsTemplate;

import activemq.test.model.User;
public class MarketingReceiverGatewayImpl {
	
	private JmsTemplate jmsTemplate;
	
	public JmsTemplate getJmsTemplate() {
		return jmsTemplate;
	}
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
	
	public MarketingReceiverGatewayImpl() {
	}
	
	public void receiveMotorist() throws Exception{
		User message  = (User)jmsTemplate.receiveAndConvert();
		System.out.println("reviced msg is:" + message.toString());
	}
	
}
package activemq.test.p2p.consumer;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class StartConsumer {
	public static void main(String[] args) {
		/*开始加载spring配置文件*/
			ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml");
			MarketingReceiverGatewayImpl rantzMarketingGateway= (MarketingReceiverGatewayImpl) context.getBean("consumer");
			System.out.println("Receive Start ...");
			try {
				while(true){
					rantzMarketingGateway.receiveMotorist();
				}
				
			} catch (Exception e) {
				e.printStackTrace();
			}
	    }
}
package activemq.test.p2p.producer;

public interface IRantzMarketingGateway {
	/**
	 * 
	 * 发送文本对象
	 * 
	 * @author zhangjh 新增日期:2013-9-20
	 * @since smsc-gateway
	 */
	public void sendMotoristInfo();

	/**
	 * 
	 * 发送对象
	 * 
	 * @author zhangjh 新增日期:2013-9-20
	 * @since smsc-gateway
	 */
	public void sendObjectInfo();
}
package activemq.test.p2p.producer;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import activemq.test.model.User;

public class RantzMarketingGatewayImpl implements IRantzMarketingGateway {

	private JmsTemplate jmsTemplate;
	private Destination destination;

	public JmsTemplate getJmsTemplate() {
		return jmsTemplate;
	}

	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

	public Destination getDestination() {
		return destination;
	}

	public void setDestination(Destination destination) {
		this.destination = destination;
	}

	public void sendMotoristInfo() {
		MessageCreator msg = new MessageCreator(){
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage("这是一个测试,"+System.currentTimeMillis());
			}
		};
		jmsTemplate.send(destination, msg);
	}

	
	public void sendObjectInfo() {
		User u = new User();
		u.setAge(17);
		u.setName("yuky"+System.currentTimeMillis());
		u.setSex("女");
		jmsTemplate.convertAndSend(u);
		
	}
}
package activemq.test.p2p.producer;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class StartProducer {
	public static void main(String[] args) {
		/*开始加载spring配置文件*/
			ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml");
			IRantzMarketingGateway rantzMarketingGateway= (RantzMarketingGatewayImpl) context.getBean("producer");
			for(int i=0;i<10;i++){
				rantzMarketingGateway.sendObjectInfo();
				System.out.println("Start ...");
			}
			
		}
}

PUB/SUB模型

消息订阅分为非持久订阅(non-durablesubscription)和持久订阅(durablesubscrip-tion),非持久订阅只有当客户端处于激活状态,也就是和JMSProvider保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。持久订阅时,客户端向JMS注册一个识别自己身份的ID,当这个客户端处于离线时,JMSProvider会为这个ID保存所有发送到主题的消息,当客户再次连接到JMSProvider时,会根据自己的ID得到所有当自己处于离线时发送到主题的消息。

package activemq.test.topic;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import activemq.test.model.User;

public class MyListener implements MessageListener {
	private ActiveMQConnectionFactory connectionFactory;
	private Connection connection;
	private Session session;
	private MessageProducer producer;
	private Topic topic;
    private Topic control;

	public Topic getTopic() {
		return topic;
	}

	public void setTopic(Topic topic) {
		this.topic = topic;
	}

	public Topic getControl() {
		return control;
	}

	public void setControl(Topic control) {
		this.control = control;
	}

	public ActiveMQConnectionFactory getConnectionFactory() {
		return connectionFactory;
	}

	public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
		this.connectionFactory = connectionFactory;
	}

	public void onMessage(Message message) {
		try{
			if (checkText(message, "SHUTDOWN")) {
	            try {
	                connection.close();
	                System.out.println("退出监听消息");
	            } catch (Exception e) {
	                e.printStackTrace(System.out);
	            }

	        } else if (checkText(message, "REPORT")) {
	            // send a report:
	            try {
	            	 System.out.println("MyListener->收到 a report");
	                long time = System.currentTimeMillis();
	                String msg = "MyListener->返回 a report :" + time + "ms";
	                System.out.println(msg);
	                producer.send(session.createTextMessage(msg));
	            } catch (Exception e) {
	                e.printStackTrace(System.out);
	            }
	        } else {
	        	ObjectMessage obj = (ObjectMessage)message;
	        	User u = (User) obj.getObject();
	        	System.out.println("Received  messages."+ u.toString());
	        }
		}catch(Exception e){
			
		}
	}

	public void run() throws JMSException {
		if(connectionFactory!=null){
			System.out.println("connectionFactory is ok");
			connection = connectionFactory.createConnection();
	        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	        MessageConsumer consumer = session.createConsumer(topic);
	        consumer.setMessageListener(this);
	        connection.start();
	        
	        producer = session.createProducer(control);
	        System.out.println("Waiting for messages...");
		}
	}
	
	private static boolean checkText(Message m, String s) {
        try {
            return m instanceof TextMessage && ((TextMessage)m).getText().equals(s);
        } catch (JMSException e) {
            e.printStackTrace(System.out);
            return false;
        }
    }
	
}
package activemq.test.topic;



import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class StartListener {

	public static void main(String[] args) {
		/*开始加载spring配置文件*/
			ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml");
			MyListener myListener= (MyListener) context.getBean("myListener");
		
			try {
				if(myListener!=null){
					System.out.println("success...");
				}
				myListener.run();
				
			} catch (Exception e) {
				e.printStackTrace();
			}
	    }

}
package activemq.test.topic;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import activemq.test.model.User;

public class MyPublisher implements MessageListener {
	private ActiveMQConnectionFactory connectionFactory;
	private Connection connection;
	private Session session;
	private MessageProducer publisher;
	private Topic topic;
	private Topic control;
	private final Object mutex = new Object();

	public ActiveMQConnectionFactory getConnectionFactory() {
		return connectionFactory;
	}

	public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
		this.connectionFactory = connectionFactory;
	}

	public Topic getTopic() {
		return topic;
	}

	public void setTopic(Topic topic) {
		this.topic = topic;
	}

	public Topic getControl() {
		return control;
	}

	public void setControl(Topic control) {
		this.control = control;
	}

	
	public void onMessage(Message message) {
        synchronized (mutex) {
            System.out.println("Received report " + getReport(message) );
            
        }
    }
	
	Object getReport(Message m) {
        try {
            return ((TextMessage)m).getText();
        } catch (JMSException e) {
            e.printStackTrace(System.out);
            return e.toString();
        }
    }

	public void publish() throws Exception {
		 	User u = new User();
			u.setAge(17);
			u.setName("yuky"+System.currentTimeMillis());
			u.setSex("女");
	        // send events
			ObjectMessage obj = session.createObjectMessage();
			obj.setObject(u);
	        for (int i = 0; i < 10; i++) {
	            publisher.send(obj);
	            publisher.send(session.createTextMessage("REPORT"));
	        }
	    }
	 
	public void run() throws Exception {
		connection = connectionFactory.createConnection();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		publisher = session.createProducer(topic);
		publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		
		session.createConsumer(control).setMessageListener(this);
        connection.start();
	}
	
	public void stop() throws JMSException{
		publisher.send(session.createTextMessage("SHUTDOWN"));
		connection.stop();
        connection.close();
	}
}
package activemq.test.topic;

import javax.jms.JMSException;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class StartPublisher {
	public static void main(String[] args) throws InterruptedException {
		/*开始加载spring配置文件*/
			ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml");
			MyPublisher publisher= (MyPublisher) context.getBean("myPublisher");
			try {
				publisher.run();
				publisher.publish();
			} catch (Exception e) {
				try {
					publisher.stop();
				} catch (JMSException e1) {
					e1.printStackTrace();
				}
				e.printStackTrace();
			}
			
    }
}

相关推荐

xinglun / 0评论 2020-06-14