heliang0 2015-09-10
package com.gosun.activemq; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; /** * 消息队列模型发送消息至activeMQ * @author Ickes */ public class QueueSend { public static void main(String[] args) throws Exception { //第一步:根据url创建一个jms Connection。 ActiveMQConnectionFactory connectionfactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionfactory.createConnection(); connection.start(); //第二步:根据connection获取session Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //第三步:消息的目的地 Destination destination = new ActiveMQQueue("gosun"); //第四步:创建消息生产者 MessageProducer producer = session.createProducer(destination); //设置不持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //第五步:创建消息 Message msg = session.createTextMessage("JMS 告诉你我是ICKES"); //第六步:生产者向JMS发送消息到队列 producer.send(msg); //第七步:关闭连接 session.close(); connection.close(); } }
第二步:Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);解释如下
在connection的基础上创建一个session,同时设置是否支持事务ACKNOWLEDGE标识。
package com.gosun.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息消费者,手动接收示例 * @author Ickes */ public class QueuesAccept { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = connectionFactory.createConnection(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); //消息目的地 Destination dest = session.createQueue("gosun"); //消息消费者 MessageConsumer consumer = session.createConsumer(dest); //接收消息,超时时间为10秒,先手动接受JMS消息,这儿可以用监听 TextMessage textMessage = (TextMessage) consumer.receive(10*1000); String text = textMessage.getText(); System.out.println("接收到的消息为:" + text); //关闭通道 consumer.close(); session.close(); conn.close(); } }
package com.gosun.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 使用监听器,自动接收消息 * @author Lenovo * */ public class QueuesAcceptListener implements MessageListener{ @Override public void onMessage(Message message) { TextMessage text = (TextMessage) message; try { System.out.println(text.getText()); } catch (JMSException e) { e.printStackTrace(); } } /** * 测试代码 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = connectionFactory.createConnection(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); //消息目的地 Destination dest = session.createQueue("gosun"); //消息消费者 MessageConsumer consumer = session.createConsumer(dest); consumer.setMessageListener(new QueuesAcceptListener()); //这里不能关闭连接,一旦关闭监听器也就关闭,那就接收不到消息了 } }
package com.gosun.activemq.topic; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQTopic; /** * 生产者发送消息到主题中 * @author Ickes * */ public class TopicSend { public static void main(String[] args) throws Exception { // 第一步:根据url创建一个jms Connection。 ActiveMQConnectionFactory connectionfactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionfactory.createConnection(); connection.start(); // 第二步:根据connection获取session Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); // 第三步:创建一个Topic Topic topic= new ActiveMQTopic("testTopic"); // 第四步:创建生产者用于将消息发送至主题 MessageProducer producer = session.createProducer(topic); // 设置不持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 第五步:创建消息 Message msg = session.createTextMessage("JMS 告诉你我是ICKES"); producer.send(msg); //第七步:关闭连接 session.close(); connection.close(); } }
package com.gosun.activemq.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消费者从主题中订阅消息 * @author Ickes * */ public class TopicAccept { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = connectionFactory.createConnection(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); //主题目的地 Topic topic =session.createTopic("testTopic"); //注册订阅者 MessageConsumer consumer = session.createConsumer(topic); //手动获取消息 TextMessage textMessage = (TextMessage) consumer.receive(10*1000); String text = textMessage.getText(); System.out.println("接收到的消息为:" + text); //关闭通道 consumer.close(); session.close(); conn.close(); } }