Java高知 2015-04-24
最近项目里面要求实时的分析数据,唉,storm学习成本太高,所以就想到了activeMQ. Apache ActiveMQ是最流行的功能强大的开源即时通讯和集成模式的服务器。Apache ActiveMQ的速度快,支持多语言和跨客户协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能。废话不多说,直接上demo了。
=============================== action start =========================
一:必备jar包
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.9.0</version>
</dependency>
二:服务端
服务端包含4部分:
a.消息转换器
b.消息发布者
c.消息发送
d.activemq.xml配置
a.消息转换器
package cn.innosoft.jt809.activemq.converter;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
import cn.innosoft.jt809.biz.dynamic.model.TGnssGpsHis;
/**
* jms消息转换器
* @author gaoq
* @date 2015-3-27 下午2:57:23
*/
public class MsgConverter implements MessageConverter{
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
if (!(object instanceof GpsVehicleReceive)) {
throw new MessageConversionException("obj is not MsgPojo");
}
GpsVehicleReceive msgPojo = (GpsVehicleReceive) object;
TextMessage textMessage = session.createTextMessage();
String msg = getGpsMsgString(msgPojo);
textMessage.setText(msg);
return textMessage;
}
@Override
public Object fromMessage(Message message) throws JMSException, MessageConversionException {
if (!(message instanceof TextMessage)) {
throw new MessageConversionException("Message is not TextMessage");
}
TextMessage textMessage = (TextMessage) message;
TGnssGpsHis msg = new TGnssGpsHis();
String[] texts=textMessage.getText().split(",");
msg.setX(1);
} catch (ParseException e) {
e.printStackTrace();
}
return msg;
}
/**
* 获取对象转换后的字符串.
* @param msgPojo TGnssGpsHis
* @return String
*/
private String getGpsMsgString(GpsVehicleReceive msgPojo) {
String msg = msgPojo.getX()+","+msgPojo.getY();
return msg;
}
}
b.消息发布者
package cn.innosoft.jt809.activemq.service;
import javax.jms.Destination;
import org.springframework.jms.core.JmsTemplate;
import cn.innosoft.jt809.activemq.converter.GpsVehicleReceive;
/**
* 消息发布者.
* @author gaoq
* @date 2015-3-19 上午11:12:00
*/
public class TopicPublisherService {
JmsTemplate jmsTemplate;
Destination destination;
// public void send(final String msg) {
// MessageCreator messageCreator = new MessageCreator() {
// public Message createMessage(Session session) throws JMSException {
// TextMessage message = session.createTextMessage();
// message.setText(msg);
// return message;
// }
// };
// jmsTemplate.send(this.destination, messageCreator);
// }
public void convertAndSend(GpsVehicleReceive msgPojo){
jmsTemplate.convertAndSend(this.destination, msgPojo);
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void setDestination(Destination destination) {
this.destination = destination;
}
}
c.消息发送(在应用类中,获取数据并发送)
private static void sendMsg(TGnssGpsHis gpsHis){
GpsVehicleReceive r = new GpsVehicleReceive();
r.setX(1);
r.setY(2);
topicPublisherService.convertAndSend(r);//发送
r = null;
}
d.activemq.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:9220" />
</bean>
</property>
</bean>
<!-- 发送消息的目的地(主题) -->
<bean id="topicSubscriberMessageListenerDest" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="myMessageListenerTopic" />
</bean>
<bean id="msgConverter" class="cn.innosoft.jt809.activemq.converter.MsgConverter"></bean>
<!-- 配置TopicJms模板 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="topicSubscriberMessageListenerDest" />
<!-- 配置是否为发布订阅者模式,默认为false -->
<property name="pubSubDomain" value="true"/>
<!-- 转换器 -->
<property name="messageConverter" ref="msgConverter"></property>
<!--<property name="receiveTimeout" value="10000" /> -->
</bean>
<bean id="topicPublisherService" class="cn.innosoft.jt809.activemq.service.TopicPublisherService">
<property name="jmsTemplate" ref="jmsTopicTemplate"/>
<property name="destination" ref="topicSubscriberMessageListenerDest" />
</bean>
</beans>
三:客户端使用
客户端使用包含2部分:
a.监听类
b.activeMq.xml
a.监听类
package cn.innosoft.exceptionAnalyse.activemq.service;
import java.text.SimpleDateFormat;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import cn.innosoft.exceptionAnalyse.biz.AnalyseMsgMng;
import cn.innosoft.exceptionAnalyse.biz.cache.model.GpsVehicleReceive;
/**
* 经度异常接收数据监听类.
* @author gaoq
* @date 2015-3-27 下午4:32:30
*/
public class TopicColorExceptionService implements MessageListener{
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
GpsVehicleReceive msg = bindProperties(textMessage);
// AnalyseMsgMng.getInstance().colorQueue.put(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 绑定参数到对象上.
* @param textMessage 消息.
* @return GpsVehicleReceive
*/
private GpsVehicleReceive bindProperties(TextMessage textMessage) throws Exception{
GpsVehicleReceive msg = new GpsVehicleReceive();
String[] texts=textMessage.getText().split(",");
msg.setX1(texts[0]);
msg.setX2(texts[1]);
msg.setX3(texts[2]);
return msg;
}
}
b.activeMq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
<bean id="connectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:9220" />
</bean>
<!-- 发送消息的目的地(主题) -->
<bean id="topicSubscriberMessageListenerDest" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="myMessageListenerTopic" />
</bean>
<!-- <bean id="msgConverter" class="cn.innosoft.freightAnalyse.activemq.converter.MsgConverter"></bean> -->
<!-- 配置TopicJms模板 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="topicSubscriberMessageListenerDest" />
<!-- 配置是否为发布订阅者模式,默认为false -->
<property name="pubSubDomain" value="true"/>
</bean>
<bean id="topicColorException" class="cn.innosoft.exceptionAnalyse.activemq.service.TopicColorExceptionService"></bean>
<bean id="myMsgTopiclistenerContainerColor"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="topicSubscriberMessageListenerDest" />
<property name="messageListener" ref="topicColorException" />
<property name="pubSubDomain" value="true" />
</bean>
</beans>
============================= action end ==========================
综上所述:一个完整的activeMQ的使用方式介绍完了。
接下来干啥呢。。。。。
想起来了。。。。activeMQ的服务还要启动,那么如何启动activeMQ的服务呢?
https://repository.apache.org/content/repositories/releases/org/apache/activemq/
下载了activeMQ之后
apache-activemq-5.9.0-bin.zip
解压后会发现:apache-activemq-5.9.0\bin目录下有win32,win64,那这个时候就要看服务器是什么系统了,我的是64位,所以就进去apache-activemq-5.9.0\bin\win64目录双击activemq.bat服务如果启动无异常就好了,然后在浏览器中访问:http://localhost:8161/admin,用户名:admin,密码:admin (这个应该是固定的,如果想改,也是可以得到自己去配置文件中找到位置改掉就ok了)
大功告成。。。。。。。。。。。。。。。