heliang0 2015-09-10
package com.gosun.activemq;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
/**
* 消息队列模型发送消息至activeMQ
* @author Ickes
*/
public class QueueSend {
public static void main(String[] args) throws Exception {
//第一步:根据url创建一个jms Connection。
ActiveMQConnectionFactory connectionfactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionfactory.createConnection();
connection.start();
//第二步:根据connection获取session
Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//第三步:消息的目的地
Destination destination = new ActiveMQQueue("gosun");
//第四步:创建消息生产者
MessageProducer producer = session.createProducer(destination);
//设置不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//第五步:创建消息
Message msg = session.createTextMessage("JMS 告诉你我是ICKES");
//第六步:生产者向JMS发送消息到队列
producer.send(msg);
//第七步:关闭连接
session.close();
connection.close();
}
}第二步:Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);解释如下
在connection的基础上创建一个session,同时设置是否支持事务ACKNOWLEDGE标识。
package com.gosun.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息消费者,手动接收示例
* @author Ickes
*/
public class QueuesAccept {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection conn = connectionFactory.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
//消息目的地
Destination dest = session.createQueue("gosun");
//消息消费者
MessageConsumer consumer = session.createConsumer(dest);
//接收消息,超时时间为10秒,先手动接受JMS消息,这儿可以用监听
TextMessage textMessage = (TextMessage) consumer.receive(10*1000);
String text = textMessage.getText();
System.out.println("接收到的消息为:" + text);
//关闭通道
consumer.close();
session.close();
conn.close();
}
}package com.gosun.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 使用监听器,自动接收消息
* @author Lenovo
*
*/
public class QueuesAcceptListener implements MessageListener{
@Override
public void onMessage(Message message) {
TextMessage text = (TextMessage) message;
try {
System.out.println(text.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* 测试代码
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection conn = connectionFactory.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
//消息目的地
Destination dest = session.createQueue("gosun");
//消息消费者
MessageConsumer consumer = session.createConsumer(dest);
consumer.setMessageListener(new QueuesAcceptListener());
//这里不能关闭连接,一旦关闭监听器也就关闭,那就接收不到消息了
}
}package com.gosun.activemq.topic;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
/**
* 生产者发送消息到主题中
* @author Ickes
*
*/
public class TopicSend {
public static void main(String[] args) throws Exception {
// 第一步:根据url创建一个jms Connection。
ActiveMQConnectionFactory connectionfactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionfactory.createConnection();
connection.start();
// 第二步:根据connection获取session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 第三步:创建一个Topic
Topic topic= new ActiveMQTopic("testTopic");
// 第四步:创建生产者用于将消息发送至主题
MessageProducer producer = session.createProducer(topic);
// 设置不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 第五步:创建消息
Message msg = session.createTextMessage("JMS 告诉你我是ICKES");
producer.send(msg);
//第七步:关闭连接
session.close();
connection.close();
}
}package com.gosun.activemq.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消费者从主题中订阅消息
* @author Ickes
*
*/
public class TopicAccept {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection conn = connectionFactory.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
//主题目的地
Topic topic =session.createTopic("testTopic");
//注册订阅者
MessageConsumer consumer = session.createConsumer(topic);
//手动获取消息
TextMessage textMessage = (TextMessage) consumer.receive(10*1000);
String text = textMessage.getText();
System.out.println("接收到的消息为:" + text);
//关闭通道
consumer.close();
session.close();
conn.close();
}
}