wxuande 2018-11-25
客户端-A 给 客户端-B 发送消息“hello”流程如下:
服务端Server又称 MQTT Broker 即订阅和发送的中间人
在上述的客户端-A 给 客户端-B 发送消息“hello”流程中需要有如下动作。
Session:会话即客户端(由ClientId作为标示)和服务端之间逻辑层面的通信;生命周期(存在时间):会话 >= 网络连接。
只能包含这些 大写字母,小写字母 和 数字(0-9a-zA-Z),23个字符以内
如果 ClientID 在多次 TCP连接中保持一致,客户端和服务器端会保留会话信息(Session)
同一时间内 Server 和同一个 ClientID 只能保持一个 TCP 连接,再次连接会踢掉前一个。
Keep Alive:目的是保持长连接的可靠性,以及双方对彼此是否在线的确认。
客户端在Connect的时候设置 Keep Alive 时长。如果服务端在 1.5 * KeepAlive 时间内没有收到客户端的报文,它必须断开客户端的网络连接
Keep Alive 的值由具体应用指定,一般是几分钟。允许的最大值是 18 小时 12 分 15 秒。
Will:遗嘱消息(Will Message)存储在服务端,当网络连接关闭时,服务端必须发布这个遗嘱消息,所以被形象地称之为遗嘱,可用于通知异常断线。
客户端发送 DISCONNECT 关闭链接,遗嘱失效并删除
服务端检测到了一个 I/O 错误或者网络故障
客户端在保持连接(Keep Alive)的时间内未能通讯
客户端没有先发送 DISCONNECT 报文直接关闭了网络连接
Will Flag :遗嘱的总开关
Will QoS: 遗嘱消息 QoS可取值 0、1、2,含义与消息QoS相同
Will Retain:遗嘱是否保留
Will Topic:遗嘱话题
Will Payload:遗嘱消息内容
public void processConnect(Channel channel, MqttConnectMessage msg) { MqttConnectPayload payload = msg.payload(); String clientId = payload.clientIdentifier(); final String username = payload.userName(); LOG.debug("Processing CONNECT message. CId={}, username={}", clientId, username); // 1. 判断客户端连接时发送的MQTT协议版本号,非3.1和3.1.1版本发送协议不支持响应报文并在发送完成后关闭连接 if (isNotProtocolVersion(msg, MqttVersion.MQTT_3_1) && isNotProtocolVersion(msg, MqttVersion.MQTT_3_1_1)) { MqttConnAckMessage badProto = connAck(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION); LOG.error("MQTT protocol version is not valid. CId={}", clientId); channel.writeAndFlush(badProto).addListener(FIRE_EXCEPTION_ON_FAILURE); channel.close().addListener(CLOSE_ON_FAILURE); return; } final boolean cleanSession = msg.variableHeader().isCleanSession(); if (clientId == null || clientId.length() == 0) { // 2. 在客户端配置了cleanSession=false 或者服务端不允许clientId不存在的情况下客户端如果未上传clientId发送协议不支持响应报文并在发送完成后关闭连接 if (!cleanSession || !this.allowZeroByteClientId) { MqttConnAckMessage badId = connAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED); channel.writeAndFlush(badId).addListener(FIRE_EXCEPTION_ON_FAILURE); channel.close().addListener(CLOSE_ON_FAILURE); LOG.error("MQTT client ID cannot be empty. Username={}", username); return; } // Generating client id. clientId = UUID.randomUUID().toString().replace("-", ""); LOG.info("Client has connected with server generated id={}, username={}", clientId, username); } // 3. 判断用户名和密码是否合法 if (!login(channel, msg, clientId)) { channel.close().addListener(CLOSE_ON_FAILURE); return; } //4.初始化连接对象并将连接对象引用放入连接管理中,如果发现连接管理中存在相同客户端ID的对象则关闭前一个连接并将新的连接 对象放入连接管理中 ConnectionDescriptor descriptor = new ConnectionDescriptor(clientId, channel, cleanSession); final ConnectionDescriptor existing = this.connectionDescriptors.addConnection(descriptor); if (existing != null) { LOG.info("Client ID is being used in an existing connection, force to be closed. CId={}", clientId); existing.abort(); //return; this.connectionDescriptors.removeConnection(existing); this.connectionDescriptors.addConnection(descriptor); } if (!descriptor.assignState(DISCONNECTED, INIT_SESSION)) { channel.close().addListener(CLOSE_ON_FAILURE); return; } LOG.debug("Initializing client session {}", clientId); ClientSession existingSession = this.sessionsRepository.sessionForClient(clientId); boolean isSessionAlreadyStored = existingSession != null; final boolean msgCleanSessionFlag = msg.variableHeader().isCleanSession(); if (isSessionAlreadyStored && msgCleanSessionFlag) { for (Subscription existingSub : existingSession.getSubscriptions()) { this.subscriptions.removeSubscription(existingSub.getTopicFilter(), clientId); } } // 5. 根据客户端上传的心跳时间调整服务端当前连接的心跳判断时间(keepAlive * 1.5f) initializeKeepAliveTimeout(channel, msg, clientId); final ClientSession clientSession = this.sessionsRepository.createOrLoadClientSession(clientId, cleanSession); // 6. 遗嘱消息存储(当连接意外断开时向存储的主题发布消息) clientSession.storeWillMessage(msg, clientId); int flushIntervalMs = 500/* (keepAlive * 1000) / 2 */; setupAutoFlusher(channel, flushIntervalMs); if (!cleanSession && reauthorizeSubscriptionsOnConnect) { reauthorizeOnExistingSubscriptions(clientId, username); } if (!descriptor.assignState(INIT_SESSION, SENDACK)) { channel.close().addListener(CLOSE_ON_FAILURE); return; } LOG.debug("Sending CONNACK. CId={}", clientId); MqttConnAckMessage okResp = createConnectAck(msg, clientId); final String connectClientId = clientId; // 7. 发送连接成功响应 descriptor.writeAndFlush(okResp, new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (future.isSuccess()) { LOG.debug("CONNACK has been sent. CId={}", connectClientId); if (!descriptor.assignState(SENDACK, MESSAGES_REPUBLISHED)) { channel.close().addListener(CLOSE_ON_FAILURE); return; } m_interceptor.notifyClientConnected(msg); if (!msg.variableHeader().isCleanSession()) { // force the republish of stored QoS1 and QoS2 internalRepublisher.publishStored(clientSession); } if (!descriptor.assignState(MESSAGES_REPUBLISHED, ESTABLISHED)) { channel.close().addListener(CLOSE_ON_FAILURE); } LOG.info("Connected client <{}> with login <{}>", connectClientId, username); } else { future.channel().pipeline().fireExceptionCaught(future.cause()); } } }); } private boolean isNotProtocolVersion(MqttConnectMessage msg, MqttVersion version) { return msg.variableHeader().version() != version.protocolLevel(); }
public void processSubscribe(Channel channel, MqttSubscribeMessage msg) { String clientID = NettyUtils.clientID(channel); int messageID = messageId(msg); LOG.debug("Processing SUBSCRIBE message. CId={}, messageId={}", clientID, messageID); RunningSubscription executionKey = new RunningSubscription(clientID, messageID); SubscriptionState currentStatus = subscriptionInCourse.putIfAbsent(executionKey, SubscriptionState.VERIFIED); if (currentStatus != null) { LOG.warn("Client sent another SUBSCRIBE message while this one was being processed CId={}, messageId={}", clientID, messageID); return; } String username = NettyUtils.userName(channel); // 1、订阅的主题校验(权限、主题path合法性) List<MqttTopicSubscription> ackTopics = doVerify(clientID, username, msg); MqttSubAckMessage ackMessage = doAckMessageFromValidateFilters(ackTopics, messageID); if (!this.subscriptionInCourse.replace(executionKey, SubscriptionState.VERIFIED, SubscriptionState.STORED)) { LOG.warn("Client sent another SUBSCRIBE message while the topic filters were being verified CId={}, " + "messageId={}", clientID, messageID); return; } LOG.debug("Creating and storing subscriptions CId={}, messageId={}, topics={}", clientID, messageID, ackTopics); // 2、在当前session中存储订阅的主题 List<Subscription> newSubscriptions = doStoreSubscription(ackTopics, clientID); // save session, persist subscriptions from session // 3、采用全局tree结构存储订阅信息(主题和订阅者信息),用于消息转发时根据主题查找到对应的订阅者 for (Subscription subscription : newSubscriptions) { subscriptions.add(subscription); } LOG.debug("Sending SUBACK response CId={}, messageId={}", clientID, messageID); // 4、发送订阅回应 channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE); // fire the persisted messages in session // 5、扫描持久化的消息匹配到当前订阅主题的立即向此连接发送消息 for (Subscription subscription : newSubscriptions) { publishRetainedMessagesInSession(subscription, username); } boolean success = this.subscriptionInCourse.remove(executionKey, SubscriptionState.STORED); if (!success) { LOG.warn("Unable to perform the final subscription state update CId={}, messageId={}", clientID, messageID); } else { LOG.info("Client <{}> subscribed to topics", clientID); } }
Packet Identifier:报文标识存在报文的可变报头部分,非零两个字节整数 (0-65535]。
新的不重复:客户端每次发送一个新的这些类型的报文时都必须分配一个当前 未使用的PacketID
独立维护:客户端和服务端彼此独立地分配报文标识符。因此,客户端服务端组合使用相同的报文标识符可以实现并发的消息交换。客户端和服务端产生的Packet Identifier一致不算异常。
Payload: 有效载荷即消息体最大允许 256MB。
Publish 的 Payload 允许为空,在很多场合下代表将持久消息(或者遗嘱消息)清空。采用UTF-8编码。
RETAIN 标记:每个Publish消息都需要指定的标记
每个Topic只会保留最多一个 Retain 持久消息
如果客户端想要删除某个Topic 上面的持久消息,可以向这个Topic发送一个Payload为空的持久消息
QoS :服务等级(消息可靠性)
public void processPublish(Channel channel, MqttPublishMessage msg) { final MqttQoS qos = msg.fixedHeader().qosLevel(); final String clientId = NettyUtils.clientID(channel); LOG.info("Processing PUBLISH message. CId={}, topic={}, messageId={}, qos={}", clientId, msg.variableHeader().topicName(), msg.variableHeader().messageId(), qos); switch (qos) { case AT_MOST_ONCE: this.qos0PublishHandler.receivedPublishQos0(channel, msg); break; case AT_LEAST_ONCE: this.qos1PublishHandler.receivedPublishQos1(channel, msg); break; case EXACTLY_ONCE: this.qos2PublishHandler.receivedPublishQos2(channel, msg); break; default: LOG.error("Unknown QoS-Type:{}", qos); break; } }
QoS0 最多一次
void receivedPublishQos0(Channel channel, MqttPublishMessage msg) { // verify if topic can be write final Topic topic = new Topic(msg.variableHeader().topicName()); String clientID = NettyUtils.clientID(channel); String username = NettyUtils.userName(channel); // 1. 权限判断 if (!m_authorizator.canWrite(topic, username, clientID)) { LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic); return; } // route message to subscribers IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg); toStoreMsg.setClientID(clientID); // 2. 向所有该主题的订阅者发布消息 this.publisher.publish2Subscribers(toStoreMsg, topic); if (msg.fixedHeader().isRetain()) { // 3. QoS == 0 && retain => clean old retained m_messagesStore.cleanRetained(topic); } m_interceptor.notifyTopicPublished(msg, clientID, username); }
QoS1 至少一次
void receivedPublishQos1(Channel channel, MqttPublishMessage msg) { // verify if topic can be write final Topic topic = new Topic(msg.variableHeader().topicName()); topic.getTokens(); if (!topic.isValid()) { LOG.warn("Invalid topic format, force close the connection"); channel.close().addListener(CLOSE_ON_FAILURE); return; } String clientID = NettyUtils.clientID(channel); String username = NettyUtils.userName(channel); // 1. 权限判断 if (!m_authorizator.canWrite(topic, username, clientID)) { LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic); return; } final int messageID = msg.variableHeader().messageId(); // route message to subscribers IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg); toStoreMsg.setClientID(clientID); // 2. 向所有该主题的订阅者发布消息(每个session中存储即将要发送的消息) this.publisher.publish2Subscribers(toStoreMsg, topic, messageID); // 3. 发送Ack回应 sendPubAck(clientID, messageID); // 4. retain = true => 存储消息 if (msg.fixedHeader().isRetain()) { if (!msg.payload().isReadable()) { m_messagesStore.cleanRetained(topic); } else { // before wasn't stored m_messagesStore.storeRetained(topic, toStoreMsg); } } m_interceptor.notifyTopicPublished(msg, clientID, username); }
public void processPubAck(Channel channel, MqttPubAckMessage msg) { String clientID = NettyUtils.clientID(channel); int messageID = msg.variableHeader().messageId(); String username = NettyUtils.userName(channel); LOG.trace("retrieving inflight for messageID <{}>", messageID); ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID); StoredMessage inflightMsg = targetSession.inFlightAcknowledged(messageID); String topic = inflightMsg.getTopic(); InterceptAcknowledgedMessage wrapped = new InterceptAcknowledgedMessage(inflightMsg, topic, username, messageID); m_interceptor.notifyMessageAcknowledged(wrapped); }
QoS2 有且仅有一次
void receivedPublishQos2(Channel channel, MqttPublishMessage msg) { final Topic topic = new Topic(msg.variableHeader().topicName()); // check if the topic can be wrote String clientID = NettyUtils.clientID(channel); String username = NettyUtils.userName(channel); // 1. 权限判断 if (!m_authorizator.canWrite(topic, username, clientID)) { LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic); return; } final int messageID = msg.variableHeader().messageId(); // 2. 存储消息 IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg); toStoreMsg.setClientID(clientID); LOG.info("Sending publish message to subscribers CId={}, topic={}, messageId={}", clientID, topic, messageID); if (LOG.isTraceEnabled()) { LOG.trace("payload={}, subs Tree={}", payload2Str(toStoreMsg.getPayload()), subscriptions.dumpTree()); } this.sessionsRepository.sessionForClient(clientID).markAsInboundInflight(messageID, toStoreMsg); // 3. 发送Rec回应 sendPubRec(clientID, messageID); // Next the client will send us a pub rel // NB publish to subscribers for QoS 2 happen upon PUBREL from publisher // if (msg.fixedHeader().isRetain()) { // if (msg.payload().readableBytes() == 0) { // m_messagesStore.cleanRetained(topic); // } else { // m_messagesStore.storeRetained(topic, toStoreMsg); // } // } //TODO this should happen on PUB_REL, else we notify false positive m_interceptor.notifyTopicPublished(msg, clientID, username); }
void processPubRel(Channel channel, MqttMessage msg) { String clientID = NettyUtils.clientID(channel); int messageID = messageId(msg); LOG.info("Processing PUBREL message. CId={}, messageId={}", clientID, messageID); ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID); // 1. 删除消息 IMessagesStore.StoredMessage evt = targetSession.inboundInflight(messageID); if (evt == null) { LOG.warn("Can't find inbound inflight message for CId={}, messageId={}", clientID, messageID); throw new IllegalArgumentException("Can't find inbound inflight message"); } final Topic topic = new Topic(evt.getTopic()); // 2. 转发消息 this.publisher.publish2Subscribers(evt, topic, messageID); if (evt.isRetained()) { if (evt.getPayload().readableBytes() == 0) { m_messagesStore.cleanRetained(topic); } else { m_messagesStore.storeRetained(topic, evt); } } //TODO here we should notify to the listeners //m_interceptor.notifyTopicPublished(msg, clientID, username); // 3.发送Comp 回应 sendPubComp(clientID, messageID); }
public void processPubRec(Channel channel, MqttMessage msg) { String clientID = NettyUtils.clientID(channel); int messageID = messageId(msg); LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID); ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID); // remove from the inflight and move to the QoS2 second phase queue // 1. 删除消息 StoredMessage ackedMsg = targetSession.inFlightAcknowledged(messageID); // 2. 存储消息(分别存储在secondPhaseStore和outboundInflightMap) targetSession.moveInFlightToSecondPhaseAckWaiting(messageID, ackedMsg); // once received a PUBREC reply with a PUBREL(messageID) LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID); // 3. 发送PUBREL MqttFixedHeader pubRelHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, false, 0); MqttMessage pubRelMessage = new MqttMessage(pubRelHeader, from(messageID)); channel.writeAndFlush(pubRelMessage).addListener(FIRE_EXCEPTION_ON_FAILURE); }
public void processPubComp(Channel channel, MqttMessage msg) { String clientID = NettyUtils.clientID(channel); int messageID = messageId(msg); LOG.debug("Processing PUBCOMP message. CId={}, messageId={}", clientID, messageID); // once received the PUBCOMP then remove the message from the temp memory ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID); // 1. 删除消息 StoredMessage inflightMsg = targetSession.completeReleasedPublish(messageID); String username = NettyUtils.userName(channel); String topic = inflightMsg.getTopic(); final InterceptAcknowledgedMessage interceptAckMsg = new InterceptAcknowledgedMessage(inflightMsg, topic, username, messageID); m_interceptor.notifyMessageAcknowledged(interceptAckMsg); }
Topic 话题 和 TopicFilter 话题过滤器
UTF-8 编码字符串,不能超过 65535 字节。层级数量没有限制
区分大小写,可以包含空格,不能包含空字符 (Unicode U+0000)
在收部或尾部增加 斜杠 “/”,会产生不同的Topic和TopicFilter。举例:
只包含斜杠 “/” 的 Topic 或 TopicFilter 是合法的
层级分隔符 /
主题层级分隔符可以出现在 Topic 或 TopicFilter 的任何位置
单层通配符 +
只能用于单个主题层级匹配的通配符。例如,“a/b/+” 匹配 “a/b/c1” 和 “a/b/c2” ,但是不匹配 “a/b/c/d”
可以匹配 任意层级,包括第一个和最后一个层级。
例如,“+” 是有效的,“sport/+/player1” 也是有效的。
例如,“+/tennis/#” 是有效的。只能匹配本级不能匹配上级。
例如,“sport/+” 不匹配 “sport” 但是却匹配“sport/”,“/finance” 匹配 “+/+” 和 “/+” ,但是不匹配 “+”。
多层通配符 #
例如 “a/b/c/#" 可以匹配 “a/b/c”、“a/b/c/d” 和 “a/b/c/d/e”
例如 “sport/tennis/#/ranking”是无效的
“#”是有效的,会收到所有的应用消息。 (服务器端应将此类 TopicFilter禁掉 )
服务端不能将 $ 字符开头的 Topic 匹配通配符 (#或+) 开头的 TopicFilter
服务端应该阻止客户端使用这种 Topic 与其它客户端交换消息。
服务端实现可以将 $ 开头的主题名用作其他目的。
SYS/ 被广泛用作包含服务器特定信息或控制接口的主题的前缀 客户端不特意订阅开头的 Topic,就不会收到对应的消息
如果客户端想同时接受以 “SYS/” 开头主题的消息和不以 开头主题的消息,它需要同时 订阅 “#” 和 “$SYS/#”
@Override public void add(Subscription newSubscription) { Action res; do { res = insert(newSubscription.clientId, newSubscription.topicFilter, this.root, newSubscription.topicFilter); } while (res == Action.REPEAT); } private Action insert(String clientId, Topic topic, final INode inode, Topic fullpath) { Token token = topic.headToken(); if (!topic.isEmpty() && inode.mainNode().anyChildrenMatch(token)) { Topic remainingTopic = topic.exceptHeadToken(); INode nextInode = inode.mainNode().childOf(token); return insert(clientId, remainingTopic, nextInode, fullpath); } else { if (topic.isEmpty()) { return insertSubscription(clientId, fullpath, inode); } else { return createNodeAndInsertSubscription(clientId, topic, inode, fullpath); } } }
/** * Removes subscription from CTrie, adds TNode when the last client unsubscribes, then calls for cleanTomb in a * separate atomic CAS operation. * * @param topic * @param clientID */ @Override public void removeSubscription(Topic topic, String clientID) { Action res; do { res = remove(clientID, topic, this.root, NO_PARENT); } while (res == Action.REPEAT); } private Action remove(String clientId, Topic topic, INode inode, INode iParent) { Token token = topic.headToken(); if (!topic.isEmpty() && (inode.mainNode().anyChildrenMatch(token))) { Topic remainingTopic = topic.exceptHeadToken(); INode nextInode = inode.mainNode().childOf(token); return remove(clientId, remainingTopic, nextInode, inode); } else { final CNode cnode = inode.mainNode(); if (cnode instanceof TNode) { // this inode is a tomb, has no clients and should be cleaned up // Because we implemented cleanTomb below, this should be rare, but possible // Consider calling cleanTomb here too return Action.OK; } if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) { // last client to leave this node, AND there are no downstream children, remove via TNode tomb if (inode == this.root) { return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT; } TNode tnode = new TNode(); return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT; } else if (cnode.contains(clientId) && topic.isEmpty()) { CNode updatedCnode = cnode.copy(); updatedCnode.removeSubscriptionsFor(clientId); return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT; } else { //someone else already removed return Action.OK; } } }
Set<Subscription> recursiveMatch(Topic topic, INode inode) { CNode cnode = inode.mainNode(); if (Token.MULTI.equals(cnode.token)) { return cnode.subscriptions; } if (topic.isEmpty()) { return Collections.emptySet(); } if (cnode instanceof TNode) { return Collections.emptySet(); } final Token token = topic.headToken(); if (!(Token.SINGLE.equals(cnode.token) || cnode.token.equals(token) || ROOT.equals(cnode.token))) { return Collections.emptySet(); } Topic remainingTopic = (ROOT.equals(cnode.token)) ? topic : topic.exceptHeadToken(); Set<Subscription> subscriptions = new HashSet<>(); if (remainingTopic.isEmpty()) { subscriptions.addAll(cnode.subscriptions); } for (INode subInode : cnode.allChildren()) { subscriptions.addAll(recursiveMatch(remainingTopic, subInode)); } return subscriptions; }