MojitoBlogs 2020-02-17
第一步:创建很普通的 SpringBoot 项目
第二步:加入相关依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>第三步:写代码

PayProducer 类如下所示:
package net.xdclass.xdclassmq.jms;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;
@Component
public class PayProducer {
private String producerGroup = "pay_group";
private String nameServerAddr = "192.168.0.104:9876";
private DefaultMQProducer producer;
public PayProducer() {
producer = new DefaultMQProducer(producerGroup);
//指定NameServer地址,多个地址以 ; 隔开
//如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");
producer.setNamesrvAddr(nameServerAddr);
start();
}
public DefaultMQProducer getProducer() {
return this.producer;
}
/**
* 对象在使用之前必须要调用一次,只能初始化一次
*/
public void start() {
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
/**
* 一般在应用上下文,使用上下文监听器,进行关闭
*/
public void shutdown() {
this.producer.shutdown();
}
}PayController 类如下所示:
package net.xdclass.xdclassmq.controller;
import net.xdclass.xdclassmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
@RestController
public class PayController {
@Autowired
private PayProducer payProducer;
private static final String topic = "pay_test_topic";
@RequestMapping("/api/v1/pay_cb")
public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message message = new Message(topic,"taga", ("hello rocketmq = "+text).getBytes() );
SendResult sendResult = payProducer.getProducer().send(message);
System.out.println(sendResult);
return new HashMap<>();
}
}第四步:测试
通过可视化管理后台查看消息


Message对象
注意:发送消息到 Broker 前,需要判断是否有此 Topic。启动 Broker 的时候,本地环境建议开启自动创建 Topic,生产环境建议关闭自动化创建 Topic。建议先手工创建 Topic,如果靠程序自动创建,然后再投递消息,会出现延迟情况。自动创建topic: autoCreateTopicEnable=true 无效原因:客户端版本要和服务端版本保持一致。
概念模型: 一个 Topic 下面对应多个 Queue,可以在创建 Topic 时指定,如订单类 Topic。
常见错误一
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
原因:阿里云存在多网卡,rocketmq都会根据当前网卡选择一个IP使用,当你的机器有多块网卡时,很有可能会有问题。比如,我遇到的问题是我机器上有两个IP,一个公网IP,一个私网IP, 因此需要配置broker.conf 指定当前的公网ip, 然后重新启动broker 新增配置:conf/broker.conf (属性名称brokerIP1=broker所在的公网ip地址 ) 新增这个配置:brokerIP1=120.76.62.13 启动命令:nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &
常见错误二
MQClientException: No route info of this topic, TopicTest1 原因:Broker 禁止自动创建 Topic,且用户没有通过手工方式创建 此Topic, 或者broker和Nameserver网络不通 解决: 通过 sh bin/mqbroker -m 查看配置 autoCreateTopicEnable=true 则自动创建topic Centos7关闭防火墙 systemctl stop firewalld
常见错误三
控制台查看不了数据,提示连接 10909错误 原因:Rocket默认开启了VIP通道,VIP通道端口为10911-2=10909 解决:阿里云安全组需要增加一个端口 10909
其他错误:
https://blog.csdn.net/qq_14853889/article/details/81053145 https://blog.csdn.net/wangmx1993328/article/details/81588217#%E5%BC%82%E5%B8%B8%E8%AF%B4%E6%98%8E https://www.jianshu.com/p/bfd6d849f156 https://blog.csdn.net/wangmx1993328/article/details/81588217
接着上面的工程,直接上代码,PayConsumer 类如下所示:
package net.xdclass.xdclassmq.jms;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.util.List;
@Component
public class PayConsumer {
private DefaultMQPushConsumer consumer;
private String CONSUMER_GROUP = "pay_consumer_group";
private String NAME_SERVER = "192.168.0.104:9876";
private String TOPIC = "pay_test_topic";
public PayConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr(this.NAME_SERVER);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe(this.TOPIC, "*");
// consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// try {
// Message msg = msgs.get(0);
// System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
// String topic = msg.getTopic();
// String body = new String(msg.getBody(), "utf-8");
// String tags = msg.getTags();
// String keys = msg.getKeys();
// System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// } catch (UnsupportedEncodingException e) {
// e.printStackTrace();
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
// }
// });
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
Message msg = msgs.get(0);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
String topic = msg.getTopic();
String body = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
String keys = msg.getKeys();
System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
consumer.start();
System.out.println("consumer start ...");
}
}注释掉的部分采用 Lambda 表达式写法,效果是一样的。
常见问题
1、Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.42.1:10911> failed
2、com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [1647]ms, Topic: TopicTest1, BrokersSent: [broker-a, null, null]
3、org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [Book-Air.local, MacBook-Air.local, MacBook-Air.local]
解决:多网卡问题处理
1、设置producer: producer.setVipChannelEnabled(false);
2、编辑ROCKETMQ 配置文件:broker.conf(下列ip为自己的ip)
namesrvAddr = 192.168.0.101:9876
brokerIP1 = 192.168.0.101
4、DESC: service not available now, maybe disk full, CL:
解决:修改启动脚本runbroker.sh,在里面增加一句话即可:
JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"
(磁盘保护的百分比设置成98%,只有磁盘空间使用率达到98%时才拒绝接收producer消息)
常见问题处理
https://blog.csdn.net/sqzhao/article/details/54834761
https://blog.csdn.net/mayifan0/article/details/67633729
https://blog.csdn.net/a906423355/article/details/78192828