wxuande 2018-11-25
阅读优秀的代码是一种享受,将优秀的代码用自己的世界观优秀地描述出来就十分痛苦了是要死一亿个脑细胞的。
这篇源码阅读笔记早在一年前就有了当时只是简单的记录一下自己的总结,最近将她重新整理一下希望能帮助有需要的人。
随着移动互联网快速进入后半场,越来越多的企业将注意力转移到物联网。比如共享单车和小米的智能家居产品等都是典型的物联网应用。
企业相信借助于大数据和AI技术可以获得很多额外的价值产生新的商业模式。海量数据需要通过接入服务才能流向后端产生后续价值,在接入服务中MQTT已成为物联网中非明确的标准协议国内外云厂均有其broker实现。
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
==下文中会对上述特性的实现方式进行讲解==

客户端Client
使用MQTT的程序或者设备,如环境监控传感器、共享单车、共享充电宝等。
服务端Server
一个程序或设备,作为发送消息的客户端和请求订阅的客户端之间的中介。
发布、订阅流程
客户端-A 给 客户端-B 发送消息“hello”流程如下:
有别于HTTP协议的请求响应模式,客户端-A与客户端-B不发生直接连接关系,他们之间的消息传递通过服务端Server进行转发。
服务端Server又称 MQTT Broker 即订阅和发送的中间人
在上述的客户端-A 给 客户端-B 发送消息“hello”流程中需要有如下动作。
下面将基于连接、订阅、发布这几个动作进行源码跟踪解读。

MQTT-连接.png
基本概念:
Session:会话即客户端(由ClientId作为标示)和服务端之间逻辑层面的通信;生命周期(存在时间):会话 >= 网络连接。
ClientID:客户端唯一标识,服务端用于关联一个Session
只能包含这些 大写字母,小写字母 和 数字(0-9a-zA-Z),23个字符以内
如果 ClientID 在多次 TCP连接中保持一致,客户端和服务器端会保留会话信息(Session)
同一时间内 Server 和同一个 ClientID 只能保持一个 TCP 连接,再次连接会踢掉前一个。
CleanSession:在Connect时,由客户端设置
Keep Alive:目的是保持长连接的可靠性,以及双方对彼此是否在线的确认。
客户端在Connect的时候设置 Keep Alive 时长。如果服务端在 1.5 * KeepAlive 时间内没有收到客户端的报文,它必须断开客户端的网络连接
Keep Alive 的值由具体应用指定,一般是几分钟。允许的最大值是 18 小时 12 分 15 秒。
Will:遗嘱消息(Will Message)存储在服务端,当网络连接关闭时,服务端必须发布这个遗嘱消息,所以被形象地称之为遗嘱,可用于通知异常断线。
客户端发送 DISCONNECT 关闭链接,遗嘱失效并删除
遗嘱消息发布的条件,包括:
服务端检测到了一个 I/O 错误或者网络故障
客户端在保持连接(Keep Alive)的时间内未能通讯
客户端没有先发送 DISCONNECT 报文直接关闭了网络连接
由于协议错误服务端关闭了网络连接
相关设置项,需要在Connect时,由客户端指定。
Will Flag :遗嘱的总开关
Will QoS: 遗嘱消息 QoS可取值 0、1、2,含义与消息QoS相同
Will Retain:遗嘱是否保留
Will Topic:遗嘱话题
Will Payload:遗嘱消息内容
连接流程
public void processConnect(Channel channel, MqttConnectMessage msg) {
MqttConnectPayload payload = msg.payload();
String clientId = payload.clientIdentifier();
final String username = payload.userName();
LOG.debug("Processing CONNECT message. CId={}, username={}", clientId, username);
// 1. 判断客户端连接时发送的MQTT协议版本号,非3.1和3.1.1版本发送协议不支持响应报文并在发送完成后关闭连接
if (isNotProtocolVersion(msg, MqttVersion.MQTT_3_1) && isNotProtocolVersion(msg, MqttVersion.MQTT_3_1_1)) {
MqttConnAckMessage badProto = connAck(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);
LOG.error("MQTT protocol version is not valid. CId={}", clientId);
channel.writeAndFlush(badProto).addListener(FIRE_EXCEPTION_ON_FAILURE);
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
final boolean cleanSession = msg.variableHeader().isCleanSession();
if (clientId == null || clientId.length() == 0) {
// 2. 在客户端配置了cleanSession=false 或者服务端不允许clientId不存在的情况下客户端如果未上传clientId发送协议不支持响应报文并在发送完成后关闭连接
if (!cleanSession || !this.allowZeroByteClientId) {
MqttConnAckMessage badId = connAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED);
channel.writeAndFlush(badId).addListener(FIRE_EXCEPTION_ON_FAILURE);
channel.close().addListener(CLOSE_ON_FAILURE);
LOG.error("MQTT client ID cannot be empty. Username={}", username);
return;
}
// Generating client id.
clientId = UUID.randomUUID().toString().replace("-", "");
LOG.info("Client has connected with server generated id={}, username={}", clientId, username);
}
// 3. 判断用户名和密码是否合法
if (!login(channel, msg, clientId)) {
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
//4.初始化连接对象并将连接对象引用放入连接管理中,如果发现连接管理中存在相同客户端ID的对象则关闭前一个连接并将新的连接 对象放入连接管理中
ConnectionDescriptor descriptor = new ConnectionDescriptor(clientId, channel, cleanSession);
final ConnectionDescriptor existing = this.connectionDescriptors.addConnection(descriptor);
if (existing != null) {
LOG.info("Client ID is being used in an existing connection, force to be closed. CId={}", clientId);
existing.abort();
//return;
this.connectionDescriptors.removeConnection(existing);
this.connectionDescriptors.addConnection(descriptor);
}
if (!descriptor.assignState(DISCONNECTED, INIT_SESSION)) {
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
LOG.debug("Initializing client session {}", clientId);
ClientSession existingSession = this.sessionsRepository.sessionForClient(clientId);
boolean isSessionAlreadyStored = existingSession != null;
final boolean msgCleanSessionFlag = msg.variableHeader().isCleanSession();
if (isSessionAlreadyStored && msgCleanSessionFlag) {
for (Subscription existingSub : existingSession.getSubscriptions()) {
this.subscriptions.removeSubscription(existingSub.getTopicFilter(), clientId);
}
}
// 5. 根据客户端上传的心跳时间调整服务端当前连接的心跳判断时间(keepAlive * 1.5f)
initializeKeepAliveTimeout(channel, msg, clientId);
final ClientSession clientSession = this.sessionsRepository.createOrLoadClientSession(clientId, cleanSession);
// 6. 遗嘱消息存储(当连接意外断开时向存储的主题发布消息)
clientSession.storeWillMessage(msg, clientId);
int flushIntervalMs = 500/* (keepAlive * 1000) / 2 */;
setupAutoFlusher(channel, flushIntervalMs);
if (!cleanSession && reauthorizeSubscriptionsOnConnect) {
reauthorizeOnExistingSubscriptions(clientId, username);
}
if (!descriptor.assignState(INIT_SESSION, SENDACK)) {
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
LOG.debug("Sending CONNACK. CId={}", clientId);
MqttConnAckMessage okResp = createConnectAck(msg, clientId);
final String connectClientId = clientId;
// 7. 发送连接成功响应
descriptor.writeAndFlush(okResp, new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
LOG.debug("CONNACK has been sent. CId={}", connectClientId);
if (!descriptor.assignState(SENDACK, MESSAGES_REPUBLISHED)) {
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
m_interceptor.notifyClientConnected(msg);
if (!msg.variableHeader().isCleanSession()) {
// force the republish of stored QoS1 and QoS2
internalRepublisher.publishStored(clientSession);
}
if (!descriptor.assignState(MESSAGES_REPUBLISHED, ESTABLISHED)) {
channel.close().addListener(CLOSE_ON_FAILURE);
}
LOG.info("Connected client <{}> with login <{}>", connectClientId, username);
} else {
future.channel().pipeline().fireExceptionCaught(future.cause());
}
}
});
}
private boolean isNotProtocolVersion(MqttConnectMessage msg, MqttVersion version) {
return msg.variableHeader().version() != version.protocolLevel();
}
MQTT-订阅.png
基本概念
订阅流程
public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = messageId(msg);
LOG.debug("Processing SUBSCRIBE message. CId={}, messageId={}", clientID, messageID);
RunningSubscription executionKey = new RunningSubscription(clientID, messageID);
SubscriptionState currentStatus = subscriptionInCourse.putIfAbsent(executionKey, SubscriptionState.VERIFIED);
if (currentStatus != null) {
LOG.warn("Client sent another SUBSCRIBE message while this one was being processed CId={}, messageId={}",
clientID, messageID);
return;
}
String username = NettyUtils.userName(channel);
// 1、订阅的主题校验(权限、主题path合法性)
List<MqttTopicSubscription> ackTopics = doVerify(clientID, username, msg);
MqttSubAckMessage ackMessage = doAckMessageFromValidateFilters(ackTopics, messageID);
if (!this.subscriptionInCourse.replace(executionKey, SubscriptionState.VERIFIED, SubscriptionState.STORED)) {
LOG.warn("Client sent another SUBSCRIBE message while the topic filters were being verified CId={}, " +
"messageId={}", clientID, messageID);
return;
}
LOG.debug("Creating and storing subscriptions CId={}, messageId={}, topics={}", clientID, messageID, ackTopics);
// 2、在当前session中存储订阅的主题
List<Subscription> newSubscriptions = doStoreSubscription(ackTopics, clientID);
// save session, persist subscriptions from session
// 3、采用全局tree结构存储订阅信息(主题和订阅者信息),用于消息转发时根据主题查找到对应的订阅者
for (Subscription subscription : newSubscriptions) {
subscriptions.add(subscription);
}
LOG.debug("Sending SUBACK response CId={}, messageId={}", clientID, messageID);
// 4、发送订阅回应
channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
// fire the persisted messages in session
// 5、扫描持久化的消息匹配到当前订阅主题的立即向此连接发送消息
for (Subscription subscription : newSubscriptions) {
publishRetainedMessagesInSession(subscription, username);
}
boolean success = this.subscriptionInCourse.remove(executionKey, SubscriptionState.STORED);
if (!success) {
LOG.warn("Unable to perform the final subscription state update CId={}, messageId={}", clientID, messageID);
} else {
LOG.info("Client <{}> subscribed to topics", clientID);
}
}基本概念
Packet Identifier:报文标识存在报文的可变报头部分,非零两个字节整数 (0-65535]。
一个流程中重复:这些报文包含PacketID,而且在一次通信流程内保持一致:PUBLISH(QoS>0时)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCIBE、UNSUBACK 。
新的不重复:客户端每次发送一个新的这些类型的报文时都必须分配一个当前 未使用的PacketID
当客户端处理完这个报文对应的确认后,这个报文标识符就释放可重用。
独立维护:客户端和服务端彼此独立地分配报文标识符。因此,客户端服务端组合使用相同的报文标识符可以实现并发的消息交换。客户端和服务端产生的Packet Identifier一致不算异常。
Payload: 有效载荷即消息体最大允许 256MB。
Publish 的 Payload 允许为空,在很多场合下代表将持久消息(或者遗嘱消息)清空。采用UTF-8编码。
Retain:持久消息(粘性消息)
RETAIN 标记:每个Publish消息都需要指定的标记
每个Topic只会保留最多一个 Retain 持久消息
客户端订阅带有持久消息的Topic,会立即受到这条消息。
服务器可以选择丢弃持久消息,比如内存或者存储吃紧的时候。
如果客户端想要删除某个Topic 上面的持久消息,可以向这个Topic发送一个Payload为空的持久消息
遗嘱消息(Will)的Retain持久机制同理。
QoS :服务等级(消息可靠性)
发布流程
public void processPublish(Channel channel, MqttPublishMessage msg) {
final MqttQoS qos = msg.fixedHeader().qosLevel();
final String clientId = NettyUtils.clientID(channel);
LOG.info("Processing PUBLISH message. CId={}, topic={}, messageId={}, qos={}", clientId,
msg.variableHeader().topicName(), msg.variableHeader().messageId(), qos);
switch (qos) {
case AT_MOST_ONCE:
this.qos0PublishHandler.receivedPublishQos0(channel, msg);
break;
case AT_LEAST_ONCE:
this.qos1PublishHandler.receivedPublishQos1(channel, msg);
break;
case EXACTLY_ONCE:
this.qos2PublishHandler.receivedPublishQos2(channel, msg);
break;
default:
LOG.error("Unknown QoS-Type:{}", qos);
break;
}
}从上述代码的switch语句中可以看出会根据消息的Qos级别分别进行处理
QoS0 最多一次

void receivedPublishQos0(Channel channel, MqttPublishMessage msg) {
// verify if topic can be write
final Topic topic = new Topic(msg.variableHeader().topicName());
String clientID = NettyUtils.clientID(channel);
String username = NettyUtils.userName(channel);
// 1. 权限判断
if (!m_authorizator.canWrite(topic, username, clientID)) {
LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
return;
}
// route message to subscribers
IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
toStoreMsg.setClientID(clientID);
// 2. 向所有该主题的订阅者发布消息
this.publisher.publish2Subscribers(toStoreMsg, topic);
if (msg.fixedHeader().isRetain()) {
// 3. QoS == 0 && retain => clean old retained
m_messagesStore.cleanRetained(topic);
}
m_interceptor.notifyTopicPublished(msg, clientID, username);
}QoS1 至少一次

1.发送消息PUBLISH
void receivedPublishQos1(Channel channel, MqttPublishMessage msg) {
// verify if topic can be write
final Topic topic = new Topic(msg.variableHeader().topicName());
topic.getTokens();
if (!topic.isValid()) {
LOG.warn("Invalid topic format, force close the connection");
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
String clientID = NettyUtils.clientID(channel);
String username = NettyUtils.userName(channel);
// 1. 权限判断
if (!m_authorizator.canWrite(topic, username, clientID)) {
LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
return;
}
final int messageID = msg.variableHeader().messageId();
// route message to subscribers
IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
toStoreMsg.setClientID(clientID);
// 2. 向所有该主题的订阅者发布消息(每个session中存储即将要发送的消息)
this.publisher.publish2Subscribers(toStoreMsg, topic, messageID);
// 3. 发送Ack回应
sendPubAck(clientID, messageID);
// 4. retain = true => 存储消息
if (msg.fixedHeader().isRetain()) {
if (!msg.payload().isReadable()) {
m_messagesStore.cleanRetained(topic);
} else {
// before wasn't stored
m_messagesStore.storeRetained(topic, toStoreMsg);
}
}
m_interceptor.notifyTopicPublished(msg, clientID, username);
}2.1发送消息回应PUBACK
服务端Server接收到PUBACK消息后将执行:
public void processPubAck(Channel channel, MqttPubAckMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = msg.variableHeader().messageId();
String username = NettyUtils.userName(channel);
LOG.trace("retrieving inflight for messageID <{}>", messageID);
ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
StoredMessage inflightMsg = targetSession.inFlightAcknowledged(messageID);
String topic = inflightMsg.getTopic();
InterceptAcknowledgedMessage wrapped = new InterceptAcknowledgedMessage(inflightMsg, topic, username,
messageID);
m_interceptor.notifyMessageAcknowledged(wrapped);
}QoS2 有且仅有一次

1.发送消息PUBLISH
void receivedPublishQos2(Channel channel, MqttPublishMessage msg) {
final Topic topic = new Topic(msg.variableHeader().topicName());
// check if the topic can be wrote
String clientID = NettyUtils.clientID(channel);
String username = NettyUtils.userName(channel);
// 1. 权限判断
if (!m_authorizator.canWrite(topic, username, clientID)) {
LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
return;
}
final int messageID = msg.variableHeader().messageId();
// 2. 存储消息
IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
toStoreMsg.setClientID(clientID);
LOG.info("Sending publish message to subscribers CId={}, topic={}, messageId={}", clientID, topic, messageID);
if (LOG.isTraceEnabled()) {
LOG.trace("payload={}, subs Tree={}", payload2Str(toStoreMsg.getPayload()), subscriptions.dumpTree());
}
this.sessionsRepository.sessionForClient(clientID).markAsInboundInflight(messageID, toStoreMsg);
// 3. 发送Rec回应
sendPubRec(clientID, messageID);
// Next the client will send us a pub rel
// NB publish to subscribers for QoS 2 happen upon PUBREL from publisher
// if (msg.fixedHeader().isRetain()) {
// if (msg.payload().readableBytes() == 0) {
// m_messagesStore.cleanRetained(topic);
// } else {
// m_messagesStore.storeRetained(topic, toStoreMsg);
// }
// }
//TODO this should happen on PUB_REL, else we notify false positive
m_interceptor.notifyTopicPublished(msg, clientID, username);
}2.发送消息Rel
void processPubRel(Channel channel, MqttMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = messageId(msg);
LOG.info("Processing PUBREL message. CId={}, messageId={}", clientID, messageID);
ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
// 1. 删除消息
IMessagesStore.StoredMessage evt = targetSession.inboundInflight(messageID);
if (evt == null) {
LOG.warn("Can't find inbound inflight message for CId={}, messageId={}", clientID, messageID);
throw new IllegalArgumentException("Can't find inbound inflight message");
}
final Topic topic = new Topic(evt.getTopic());
// 2. 转发消息
this.publisher.publish2Subscribers(evt, topic, messageID);
if (evt.isRetained()) {
if (evt.getPayload().readableBytes() == 0) {
m_messagesStore.cleanRetained(topic);
} else {
m_messagesStore.storeRetained(topic, evt);
}
}
//TODO here we should notify to the listeners
//m_interceptor.notifyTopicPublished(msg, clientID, username);
// 3.发送Comp 回应
sendPubComp(clientID, messageID);
}3.发送消息回应Rec
public void processPubRec(Channel channel, MqttMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = messageId(msg);
LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID);
ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
// remove from the inflight and move to the QoS2 second phase queue
// 1. 删除消息
StoredMessage ackedMsg = targetSession.inFlightAcknowledged(messageID);
// 2. 存储消息(分别存储在secondPhaseStore和outboundInflightMap)
targetSession.moveInFlightToSecondPhaseAckWaiting(messageID, ackedMsg);
// once received a PUBREC reply with a PUBREL(messageID)
LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID);
// 3. 发送PUBREL
MqttFixedHeader pubRelHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, false, 0);
MqttMessage pubRelMessage = new MqttMessage(pubRelHeader, from(messageID));
channel.writeAndFlush(pubRelMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
}3.4发送消息回应Comp
public void processPubComp(Channel channel, MqttMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = messageId(msg);
LOG.debug("Processing PUBCOMP message. CId={}, messageId={}", clientID, messageID);
// once received the PUBCOMP then remove the message from the temp memory
ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
// 1. 删除消息
StoredMessage inflightMsg = targetSession.completeReleasedPublish(messageID);
String username = NettyUtils.userName(channel);
String topic = inflightMsg.getTopic();
final InterceptAcknowledgedMessage interceptAckMsg = new InterceptAcknowledgedMessage(inflightMsg, topic,
username, messageID);
m_interceptor.notifyMessageAcknowledged(interceptAckMsg);
}基本概念
Topic 话题 和 TopicFilter 话题过滤器
Pub-Sub消息模型的核心机制
UTF-8 编码字符串,不能超过 65535 字节。层级数量没有限制
不能包含任何的下文中提到的特殊符号(/、+、#),必须至少包含一个字符
区分大小写,可以包含空格,不能包含空字符 (Unicode U+0000)
在收部或尾部增加 斜杠 “/”,会产生不同的Topic和TopicFilter。举例:
只包含斜杠 “/” 的 Topic 或 TopicFilter 是合法的
TopicFilter中的特殊符号
层级分隔符 /
用于分割主题的每个层级,为主题名提供一个分层结构
主题层级分隔符可以出现在 Topic 或 TopicFilter 的任何位置
特例:相邻的主题层次分隔符表示一个零长度的主题层级
单层通配符 +
只能用于单个主题层级匹配的通配符。例如,“a/b/+” 匹配 “a/b/c1” 和 “a/b/c2” ,但是不匹配 “a/b/c/d”
可以匹配 任意层级,包括第一个和最后一个层级。
例如,“+” 是有效的,“sport/+/player1” 也是有效的。
可以在多个层级中使用它,也可以和多层通配符一起使用。
例如,“+/tennis/#” 是有效的。只能匹配本级不能匹配上级。
例如,“sport/+” 不匹配 “sport” 但是却匹配“sport/”,“/finance” 匹配 “+/+” 和 “/+” ,但是不匹配 “+”。
多层通配符 #
用于匹配主题中任意层级的通配符
匹配包含本身的层级和子层级。
例如 “a/b/c/#" 可以匹配 “a/b/c”、“a/b/c/d” 和 “a/b/c/d/e”
必须是最后的结尾。
例如 “sport/tennis/#/ranking”是无效的
“#”是有效的,会收到所有的应用消息。 (服务器端应将此类 TopicFilter禁掉 )
以$开头的,服务器保留
服务端不能将 $ 字符开头的 Topic 匹配通配符 (#或+) 开头的 TopicFilter
服务端应该阻止客户端使用这种 Topic 与其它客户端交换消息。
服务端实现可以将 $ 开头的主题名用作其他目的。
SYS/ 被广泛用作包含服务器特定信息或控制接口的主题的前缀 客户端不特意订阅开头的 Topic,就不会收到对应的消息
如果客户端想同时接受以 “SYS/” 开头主题的消息和不以 开头主题的消息,它需要同时 订阅 “#” 和 “$SYS/#”
这4个主题会存储成如下结构:

查找算法
订阅
@Override
public void add(Subscription newSubscription) {
Action res;
do {
res = insert(newSubscription.clientId, newSubscription.topicFilter, this.root, newSubscription.topicFilter);
} while (res == Action.REPEAT);
}
private Action insert(String clientId, Topic topic, final INode inode, Topic fullpath) {
Token token = topic.headToken();
if (!topic.isEmpty() && inode.mainNode().anyChildrenMatch(token)) {
Topic remainingTopic = topic.exceptHeadToken();
INode nextInode = inode.mainNode().childOf(token);
return insert(clientId, remainingTopic, nextInode, fullpath);
} else {
if (topic.isEmpty()) {
return insertSubscription(clientId, fullpath, inode);
} else {
return createNodeAndInsertSubscription(clientId, topic, inode, fullpath);
}
}
}删除订阅
/**
* Removes subscription from CTrie, adds TNode when the last client unsubscribes, then calls for cleanTomb in a
* separate atomic CAS operation.
*
* @param topic
* @param clientID
*/
@Override
public void removeSubscription(Topic topic, String clientID) {
Action res;
do {
res = remove(clientID, topic, this.root, NO_PARENT);
} while (res == Action.REPEAT);
}
private Action remove(String clientId, Topic topic, INode inode, INode iParent) {
Token token = topic.headToken();
if (!topic.isEmpty() && (inode.mainNode().anyChildrenMatch(token))) {
Topic remainingTopic = topic.exceptHeadToken();
INode nextInode = inode.mainNode().childOf(token);
return remove(clientId, remainingTopic, nextInode, inode);
} else {
final CNode cnode = inode.mainNode();
if (cnode instanceof TNode) {
// this inode is a tomb, has no clients and should be cleaned up
// Because we implemented cleanTomb below, this should be rare, but possible
// Consider calling cleanTomb here too
return Action.OK;
}
if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) {
// last client to leave this node, AND there are no downstream children, remove via TNode tomb
if (inode == this.root) {
return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT;
}
TNode tnode = new TNode();
return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT;
} else if (cnode.contains(clientId) && topic.isEmpty()) {
CNode updatedCnode = cnode.copy();
updatedCnode.removeSubscriptionsFor(clientId);
return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
} else {
//someone else already removed
return Action.OK;
}
}
}查找
Set<Subscription> recursiveMatch(Topic topic, INode inode) {
CNode cnode = inode.mainNode();
if (Token.MULTI.equals(cnode.token)) {
return cnode.subscriptions;
}
if (topic.isEmpty()) {
return Collections.emptySet();
}
if (cnode instanceof TNode) {
return Collections.emptySet();
}
final Token token = topic.headToken();
if (!(Token.SINGLE.equals(cnode.token) || cnode.token.equals(token) || ROOT.equals(cnode.token))) {
return Collections.emptySet();
}
Topic remainingTopic = (ROOT.equals(cnode.token)) ? topic : topic.exceptHeadToken();
Set<Subscription> subscriptions = new HashSet<>();
if (remainingTopic.isEmpty()) {
subscriptions.addAll(cnode.subscriptions);
}
for (INode subInode : cnode.allChildren()) {
subscriptions.addAll(recursiveMatch(remainingTopic, subInode));
}
return subscriptions;
}