1. MQTT server
需要接收设备的 MQTT 连接,那么 thingsboard 中必然有 MQTT 服务器,MQTT 服务器创建的类是MqttTransportService;
基于 netty 的 mqtt server,添加了MqttTransportServerInitializer的处理类,并向ChannelPipeline添加了 netty 的MqttDecoder和MqttEncoder让我们可以忽略 MQTT 消息的编解码工作,重要的是添加了MqttTransportHandler;
2. MqttTransportHandler 处理连接
此例中,我们首先需要创建租户,租户管理员,并添加设备,使用 MQTT Box 模拟硬件设备,拷贝 ACCESS TOKEN 做为 MQTT Box 的 Username 开始连接我们的 thingsboard 后台

由于没有使用 ssl,收到连接请求以后,便会调用
private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {String userName = msg.payload().userName();log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);if (StringUtils.isEmpty(userName)) {ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));ctx.close();} else {//取出userName,构造protobuf的类(方便传输与解析),交给transportService处理。此时会使用到源码解析第三篇DefaultTransportService的解析的相关信息了解process的处理。参阅下方①的详细解析。transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {@Overridepublic void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {onValidateDeviceResponse(msg, ctx);}@Overridepublic void onError(Throwable e) {log.trace("[{}] Failed to process credentials: {}", address, userName, e);ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));ctx.close();}});}}
DefaultTransportService的process方法构造了异步任务,成功调用onSuccess的Consumer,失败调用onFailure的Consumer;将验证用户的任务交由
transportApiRequestTemplate.sendpublic ListenableFuture<Response> send(Request request) {if (tickSize > maxPendingRequests) {return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));}UUID requestId = UUID.randomUUID();request.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));//由第三篇文章的分析得出,此topic时tb_transport.api.responses.localHostNamerequest.getHeaders().put(RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()));request.getHeaders().put(REQUEST_TIME, longToBytes(System.currentTimeMillis()));//参阅第一篇基础知识的介绍,来自谷歌的库,settableFuture,可设置结果的完成SettableFuture<Response> future = SettableFuture.create();ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);//将future放到pendingRequests中②pendingRequests.putIfAbsent(requestId, responseMetaData);log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, request.getKey(), responseMetaData.expTime);//将消息发送给消息队列topic是tb_transport.api.requestsrequestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {@Overridepublic void onSuccess(TbQueueMsgMetadata metadata) {log.trace("[{}] Request sent: {}", requestId, metadata);}@Overridepublic void onFailure(Throwable t) {pendingRequests.remove(requestId);future.setException(t);}});return future;}
根据第三篇
TbCoreTransportApiService的分析,我们发现DefaultTbQueueResponseTemplate的成员变量requestTemplate即consumer刚好是订阅的 tb_transport.api.requests 的消息:......requests.forEach(request -> {long currentTime = System.currentTimeMillis();long requestTime = bytesToLong(request.getHeaders().get(REQUEST_TIME));if (requestTime + requestTimeout >= currentTime) {byte[] requestIdHeader = request.getHeaders().get(REQUEST_ID_HEADER);if (requestIdHeader == null) {log.error("[{}] Missing requestId in header", request);return;}//获取response的topic,可以做到消息从哪来,处理好以后回哪里去,此时的topic是tb_transport.api.responses.localHostNamebyte[] responseTopicHeader = request.getHeaders().get(RESPONSE_TOPIC_HEADER);if (responseTopicHeader == null) {log.error("[{}] Missing response topic in header", request);return;}UUID requestId = bytesToUuid(requestIdHeader);String responseTopic = bytesToString(responseTopicHeader);try {pendingRequestCount.getAndIncrement();//调用handler进行处理消息AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(request),response -> {pendingRequestCount.decrementAndGet();response.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));//handler.hande处理的结果返回给发送方topic是tb_transport.api.responses.localHostNameresponseTemplate.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null);},e -> {pendingRequestCount.decrementAndGet();if (e.getCause() != null && e.getCause() instanceof TimeoutException) {log.warn("[{}] Timeout to process the request: {}", requestId, request, e);} else {log.trace("[{}] Failed to process the request: {}", requestId, request, e);}},requestTimeout,timeoutExecutor,callbackExecutor);.......
具体验证逻辑:
@Overridepublic ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> handle(TbProtoQueueMsg<TransportApiRequestMsg> tbProtoQueueMsg) {TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue();// protobuf构造的类中判定是否包含需要验证的信息块if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();//调用validateCredentials,具体内容就是查询deviceInfo,并将结果交由第二个Function进行进一步处理return Futures.transform(validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor());}......
当通过设备的 acess token 找到了 deviceInfo,便会通过消息中间件将 DeviceInfo 发出来,topic 是tb_transport.api.responses.localHostName,在第三篇的分析中,
DefaultTransportService的transportApiRequestTemplate即订阅此 topic:List<Response> responses = responseTemplate.poll(pollInterval);if (responses.size() > 0) {log.trace("Polling responses completed, consumer records count [{}]", responses.size());} else {continue;}responses.forEach(response -> {byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER);UUID requestId;if (requestIdHeader == null) {log.error("[{}] Missing requestId in header and body", response);} else {requestId = bytesToUuid(requestIdHeader);log.trace("[{}] Response received: {}", requestId, response);//参见上②,将验证的future放入到pendingRequests中,现在通过设置的requestId取出来ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);if (expectedResponse == null) {log.trace("[{}] Invalid or stale request", requestId);} else {//设置settableFuture的结果expectedResponse.future.set(response);}}......
DefaultTransportService的process异步请求获得了返回的结果,此时调用onSuccess回调,即调用MqttTransportHandler的onValidateDeviceResponse;private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg msg, ChannelHandlerContext ctx) {if (!msg.hasDeviceInfo()) {ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));ctx.close();} else {deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());sessionInfo = SessionInfoProto.newBuilder().setNodeId(context.getNodeId()).setSessionIdMSB(sessionId.getMostSignificantBits()).setSessionIdLSB(sessionId.getLeastSignificantBits()).setDeviceIdMSB(msg.getDeviceInfo().getDeviceIdMSB()).setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB()).setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB()).setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB()).setDeviceName(msg.getDeviceInfo().getDeviceName()).setDeviceType(msg.getDeviceInfo().getDeviceType()).build();//创建SessionEvent.OPEN的消息,调用sendToDeviceActor方法,包含sessionInfotransportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() {.......
sendToDeviceActor 的实现:
protected void sendToDeviceActor(TransportProtos.SessionInfoProto sessionInfo, TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback<Void> callback) {//创建tpi,此时会选择一个固定的partition Id,组成的topic是tb_core, fullTopicName是tb_core.(int) 如: tb_core.1TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, getTenantId(sessionInfo), getDeviceId(sessionInfo));......//使用tbCoreMsgProducer发送到消息队列,设置了toDeviceActorMsgtbCoreMsgProducer.send(tpi,new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), callback != null ?new TransportTbQueueCallback(callback) : null);}
此时第二篇基于
DefaultTbCoreConsumerService可以知道DefaultTbCoreConsumerService的消费者订阅该主题的消息:try {ToCoreMsg toCoreMsg = msg.getValue();if (toCoreMsg.hasToSubscriptionMgrMsg()) {log.trace("[{}] Forwarding message to subscription manager service {}", id, toCoreMsg.getToSubscriptionMgrMsg());forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback);} else if (toCoreMsg.hasToDeviceActorMsg()) {log.trace("[{}] Forwarding message to device actor {}", id, toCoreMsg.getToDeviceActorMsg());//交由此方法进行处理forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);}
forwardToDeviceActor对消息的处理private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TbCallback callback) {if (statsEnabled) {stats.log(toDeviceActorMsg);}//创建type为TRANSPORT_TO_DEVICE_ACTOR_MSG的消息,并交给AppActor处理actorContext.tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback));}
通过第四篇的总结 3,我们可以直接去看
AppActor的doProcess方法对此类型消息的处理,跟踪发现AppActor将消息转给了TenantActor,TenantActor创建了DeviceActor,并将消息转给了DeviceActor;DeviceActor 拿到此类型的消息,进行了如下的处理:
protected boolean doProcess(TbActorMsg msg) {switch (msg.getMsgType()) {case TRANSPORT_TO_DEVICE_ACTOR_MSG://包装成TransportToDeviceActorMsgWrapper交由processor处理,并继续调用processSessionStateMsgsprocessor.process(ctx, (TransportToDeviceActorMsgWrapper) msg);break;case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
processSessionStateMsgs的处理:private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {UUID sessionId = getSessionId(sessionInfo);if (msg.getEvent() == SessionEvent.OPEN) {.....sessions.put(sessionId, new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfo.getNodeId())));if (sessions.size() == 1) {// 将调用pushRuleEngineMessage(stateData, CONNECT_EVENT);reportSessionOpen();}//将调用pushRuleEngineMessage(stateData, ACTIVITY_EVENT);systemContext.getDeviceStateService().onDeviceActivity(deviceId, System.currentTimeMillis());dumpSessions();}....
由于
CONNECT_EVENT和ACTIVITY_EVENT仅仅类型不同,以下暂时只分析CONNECT_EVENTpublic void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) {if (tenantId.isNullUid()) {if (entityId.getEntityType().equals(EntityType.TENANT)) {tenantId = new TenantId(entityId.getId());} else {log.warn("[{}][{}] Received invalid message: {}", tenantId, entityId, tbMsg);return;}}//和第7点类似,创建的tpi的fullTopicName的例子 tb_rule_engine.main.1TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);log.trace("PUSHING msg: {} to:{}", tbMsg, tpi);ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder().setTenantIdMSB(tenantId.getId().getMostSignificantBits()).setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).setTbMsg(TbMsg.toByteString(tbMsg)).build();producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);toRuleEngineMsgs.incrementAndGet();}
通过第二篇的分析
DefaultTbRuleEngineConsumerService订阅了此 topic: tb_rule_engine.main.1 的消息,收到消息以后,调用forwardToRuleEngineActor方法,包裹成QUEUE_TO_RULE_ENGINE_MSG类型的消息,交由 AppActor 进行分发处理;AppActor交给TenantActor处理,TenantActor交给RootRuleChain处理,RuleChainActor交给firstRuleNode处理,也就是某一个RuleNodeActor;- 打开前端 RULE CHAINS 的界面,会发现,MESSAGE TYPE SWITCH 是接收 input 的第一个节点,其实数据库的配置中,rule_chain表中配置的first_rule_node_id就是
TbMsgTypeSwitchNode; - 进入
TbMsgTypeSwitchNode的onMsg方法 (实际上所有的 ruleNode 处理消息的方法都是onMsg),发现根据messageType(此时是CONNECT_EVENT)定义了 relationtype 并调用ctx.tellNext(msg, relationType); - 此时
DefaultTbContext创建一个RuleNodeToRuleChainTellNextMsg,类型是RULE_TO_RULE_CHAIN_TELL_NEXT_MSG,交给RuleChainActor处理; 接下来将会进入到
RuleChainActorMessageProcessor的onTellNext方法:private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes, String failureMessage) {try {checkActive(msg);//消息来源EntityId entityId = msg.getOriginator();//创建一个tpi,可能会使用TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId);//查询有关系的RuleNode,其实就是从relation表中查询,该消息来源的id,relation_type和在TbMsgTypeSwitchNode定义的relationType一直的节点id,如上Connect Event就没有找到相应的relation的RuleNodeIdList<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream().filter(r -> contains(relationTypes, r.getType())).collect(Collectors.toList());int relationsCount = relations.size();//Connect Event就没有找到相应的relation的RuleNodeId,消息通过规则引擎,已经处理完成if (relationsCount == 0) {log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());if (relationTypes.contains(TbRelationTypes.FAILURE)) {RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId);if (ruleNodeCtx != null) {msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf()));} else {log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId());msg.getCallback().onFailure(new RuleEngineException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]"));}} else {msg.getCallback().onSuccess();}//举例:Post telemetry的type可以找到相应的ruleNode,实现类是:TbMsgTimeseriesNode,那么此消息将会交给TbMsgTimeseriesNode处理} else if (relationsCount == 1) {for (RuleNodeRelation relation : relations) {log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut());pushToTarget(tpi, msg, relation.getOut(), relation.getType());}} else {MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback());log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relations);for (RuleNodeRelation relation : relations) {EntityId target = relation.getOut();putToQueue(tpi, msg, callbackWrapper, target);}}} catch (RuleNodeException rne) {msg.getCallback().onFailure(rne);} catch (Exception e) {msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage()));}}
What’s more:
如上面的举例,比如是遥测数据 Post telemetry,将会使用TbMsgTimeseriesNode的onMsg做进一步的处理,比如存储数据,再通过 webSocket 进行数据的更新如果有 webSocket 的 session 的话,或者其他通知消息,就不详细展开了。总结:
处理 MQTT 的连接其实就是走完了整个规则引擎的逻辑,其他类型的消息,比如遥测数据,属性更新,RPC 请求发送与接收,大体流程大同小异;
- 在处理消息流向的时候,我们一定要清楚其订阅或者发布的主题是什么,这样我们才不会丢失方向;
- Actor 的模型就是根据消息的类型,使用 AppActor 进行一步步的分发,最终交由合适的 RuleNode 进行处理;
- Protobuf 类型的消息容易序列化传输与解析,所以在 thingsboard 中大量使用,但是生成的类可读性不是很高,可以选择直接读 queue.proto 文件,对类有感性的认知。
