分布式消息系统RocketMQ

BlogForUS 2016-03-11

分布式消息系统RocketMQ

 一、消息系统

RocketMQ是阿里开源出来的一个消息系统,2011年Linkin开源了Kafka,淘宝中间件团队在对Kafka做过充分Review之后 ,由于kafka是用scala开发的,于是重新用Java语言编写了RocketMQ 。定位于非日志的可靠消息传输 ,被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景 。

二、RocketMQ架构图


分布式消息系统RocketMQ
 

Producer:消息生产者,负责产生消息,一般有业务系统负责产生消息

Consumer:   消息消费者,负责消费消息,一般是后台系统负责异步消费

Broker: 消息中转角色,负责储存消息,转发消息,一般也称为Server

nameServer: broker注册topic信息到nameServer,producer和consumer从nameServer获取topic的路由信息

2.1 生产者

① Producer与NameServer集群,中的其中一个节点,建立长连接;

② 定期从NameServer取Topic路由信息;

③ 并向提供Topic服务的Master建立长连接,且定时向Master发送心跳;

④ Produce完全无状态,可集群部署;

2.2 消费者

① Consumer与NameServer集群,中的其中一个节点,建立长连接;

② 定期从NameServer取Topic路由信息;

③ 并向提供Topic服务的Master、Slaver建立长连接,且定时向Master、Slaver发送心跳;

④ Consumer即可从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定;

2.3 Broker

① 每个broker与nameServer集群,中的所有节点,建立长连接;

② 定时注册Topic信息到所有的NameServer;

三、RocketMQ使用

3.1 安装

https://github.com/alibaba/RocketMQ/releases

tar zxvf alibaba-rocketmq-3.2.6.tar.gz
ROCKETMQ_HOME=/root/mq/rocketmq
PATH=$PATH:$ROCKETMQ_HOME/bin
 

3.2 修改配置文件

/root/mq/rocketmq/conf/


分布式消息系统RocketMQ
 

3.3 启动nameServer

cd /home/dev/alibaba-rocketmq/bin
nohup sh mqnamesrv

3.4 启动broker

cd /home/dev/alibaba-rocketmq/bin

nohup sh mqbroker -c /home/dev/alibaba-rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &

3.5 应用消费端

<bean id="consumerNotifyManager" class="com.mengka.mq.client.NotifyManagerBean"
          init-method="init">
        <property name="groupId" value="OPEN_PROJECT_C_GROUP_TEST" />
        <property name="name" value="open_project" />
        <property name="topic" value="ACCTRANS-CHANGE-MESSAGE"/>
        <property name="ctype" value="PUSH"/>
        <property name="namesrvAddr" value="127.0.0.1:9876"/>
        <property name="messageListener" ref="taaMessageListener" />
</bean>
/**
 * User: mengka
 * Date: 15-4-16-下午3:40
 */
@Component
public class TaaMessageListener implements MessageListenerConcurrently {

    private static final Logger log = LoggerFactory.getLogger(TaaMessageListener.class);

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> list,
            ConsumeConcurrentlyContext Context) {
        for (MessageExt messageExt : list) {
            log.info("---------------, receive message, id = " + messageExt.getMsgId() + " , ip = " + messageExt.getBornHost() + " , tags = " + messageExt.getTags() + " , conetnt = " + new String(messageExt.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}

3.6 生产端

<bean id="producterNotifyManager" class="com.mengka.mq.client.NotifyManagerBean"
          init-method="initProducter">
        <property name="groupId" value="P-ACCTRANS-CHANGE-MESSAGE" />
        <property name="name" value="open_project" />
        <property name="topic" value="ACCTRANS-CHANGE-MESSAGE"/>
        <property name="namesrvAddr" value="127.0.0.1:9876"/>
</bean>
String serviceConfigXMLs[] = new String[]{"rocketmq/rocketmq-push-context.xml"};
        ApplicationContext context = new ClassPathXmlApplicationContext(serviceConfigXMLs);
        NotifyManager producterNotifyManager = (NotifyManager) context.getBean("producterNotifyManager");

 String key1 = "aa";
 String tag1 = "mkTag";
 String content = "Just for test[" + TimeUtil.toDate(new Date(), TimeUtil.format_1);
Message message1 = new StringMessage(key1,tag1,content);
SendResult result1 = producterNotifyManager.sendMessage(message1);

相关推荐