陈晨软件五千言 2020-06-17
RocketMq producer 在发送一条消息时候,从 producer --nameSrv -- Broker 中间经过了什么样子的数据交互
如下是 Producer 发送消息的一个demo例子:
//1. 初始化 mq producer
DefaultMQProducer mqProducer =new DefaultMQProducer("iscys-test");
//2.设置nameServer 地址
mqProducer.setNamesrvAddr("localhost:9876");
//3. 开启mq producer,这一步是必须的,会做一些连接初始化检测工作
mqProducer.start();
//4.创建 Message
Message msg = new Message("test-topis", "iscys-test".getBytes());
//5.发送消息,设置回调,消息发送成功会回调函数
mqProducer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
//在消息发送成功之后,我们收到broker的响应通知后,会进行回调
System.out.println("send success");
}
@Override
public void onException(Throwable e) {
System.out.println("send fail");
}
});构建发送消息:
public void send(Message msg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
try {
//默认异步发送,超时3s
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout);
} catch (MQBrokerException e) {
throw new MQClientException("unknownn exception", e);
}
}从NameSrv 中获取topic 配置的相关信息,比如 broker 地址,队列数 之类的。
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
//1.尝试取获取从NameSrv 中获取topic 相关信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
//2.选择一个消息队列,默认为4个,在创建新的Topic时候
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
//3.发送消息
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:主要看一下如上代码第一步 尝试获取Topic 信息 tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
//1.从 topicPublishInfoTable 从尝试从Map中获取,如果没有获取到,请求NameSrv
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//2.从NameSrv 中拉取topic 信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
//3.说明获取到TOPIC 的信息
return topicPublishInfo;
} else {
//4.如果第2步执行后 NameSrv 中没有topic 信息,获取默认的TBW102 topic 的信息,这个是肯定能获取到的
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}1. 会先从 topicPublishInfoTable 缓存中获取topic 配置信息
2.缓存没有,就从NameSrv 中拉取。
3.如果获取到了,则返回。
4.NameSrv 没有得到相关到topic 信息,说明是新到topic ,则就请求获取TBW102 topic 配置信息,这个肯定能获取到,封装使用TBW102的配置。
请求NameSrv 非默认的topic
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
//1.非默认的topic ,默认Topic 为TBW102
return updateTopicRouteInfoFromNameServer(topic, false, null);
}执行从NameSrv 获取topic 请求
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
// 1.如果请求的是默认的Topic 请求会走到这里
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
// 2.新的Topic 会先从NameSrv 中获取一遍,如果NameSrv 中没有获取到,会抛出异常
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}1. 从NameSrv 中获取到 TBW102 的topic 信息,这个一般都是有的。
2. 新的topic 会从NameSrv 中获取信息,如果不存在,返回false。
获取到topic信息后封装成 TopicPublishInfo:
public class TopicPublishInfo {
private boolean orderTopic = false;
//用来检测Topic 在Broker 真实存在的,不存在false
private boolean haveTopicRouterInfo = false;
//消息队列的
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
//请求NameSrv 返回的TOPIC 具体信息
private TopicRouteData topicRouteData;