1. DefaultTransportService

分析初始化方法:
@PostConstructpublic void init() {//根据配置判断是否创建限流if (rateLimitEnabled) {//Just checking the configuration parametersnew TbRateLimits(perTenantLimitsConf);new TbRateLimits(perDevicesLimitsConf);}this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("transport-scheduler"));this.transportCallbackExecutor = Executors.newWorkStealingPool(20);this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);//transportApiRequestTemplate的创建见下分析①,transportApiRequestTemplate中包含了//一个生产者producerTemplate(requestTemplate) topic:tb_transport.api.responses ②//和一个消费者consumerTemplate (responseTemplate) topic:tb_transport.api.responses.localHostName ③transportApiRequestTemplate = queueProvider.createTransportApiRequestTemplate();//此处的producerProvider bean的创建是按照配置文件的值创建的,TbQueueProducerProvider有三个实现类,使用ConditionalOnExpression注解,读取service.type的值(默认monolith),所以该Bean的实现类是TbCoreQueueProducerProvider,此类的@PostConstruct标记的init()方法初始化的,该类TbCoreQueueProducerProvider初始化了一下变量:// 1.toTbCore topic:tb_core// 2.toTransport topic:tb_transport.notifications// 3.toRuleEngine topic:tb_rule_engine// 4.toRuleEngineNotifications topic:tb_rule_engine// 5.toTbCoreNotifications topic:tb_coreruleEngineMsgProducer = producerProvider.getRuleEngineMsgProducer();tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer();transportNotificationsConsumer = queueProvider.createTransportNotificationsConsumer();//fullTopic = topic:tb_transport.notifications.localHostNameTopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceInfoProvider.getServiceId());transportNotificationsConsumer.subscribe(Collections.singleton(tpi));//见④分析transportApiRequestTemplate.init();mainConsumerExecutor.execute(() -> {while (!stopped) {try {List<TbProtoQueueMsg<ToTransportMsg>> records = transportNotificationsConsumer.poll(notificationsPollDuration);if (records.size() == 0) {continue;}records.forEach(record -> {try {processToTransportMsg(record.getValue());} catch (Throwable e) {log.warn("Failed to process the notification.", e);}});transportNotificationsConsumer.commit();} catch (Exception e) {if (!stopped) {log.warn("Failed to obtain messages from queue.", e);try {Thread.sleep(notificationsPollDuration);} catch (InterruptedException e2) {log.trace("Failed to wait until the server has capacity to handle new requests", e2);}}}}});}
① createTransportApiRequestTemplate In InMemoryTbTransportQueueFactory,因为我们没有启用相应的消息队列中间件,我们分析InMemoryTbTransportQueueFactory:
public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate() {//根据配置文件值queue.transport_api.requests_topic获取到的topic是tb_transport.api.requests创建了生产者InMemoryTbQueueProducer<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =new InMemoryTbQueueProducer<>(transportApiSettings.getRequestsTopic());//根据配置文件值queue.transport_api.responses_topic获取到的topic是tb_transport.api.responses//加上serviceId(我们在第二篇分析中提到,本机的HostName作为serviceId,其topic就是tb_transport.api.responses.localHostNameInMemoryTbQueueConsumer<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate =new InMemoryTbQueueConsumer<>(transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId());//使用建造者模式返回了TbQueueRequestTemplate实例,其中包含了一个消费者和一个生产者DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();templateBuilder.queueAdmin(new TbQueueAdmin() {@Overridepublic void createTopicIfNotExists(String topic) {}@Overridepublic void destroy() {}});templateBuilder.requestTemplate(producerTemplate);templateBuilder.responseTemplate(consumerTemplate);templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests());templateBuilder.maxRequestTimeout(transportApiSettings.getMaxRequestsTimeout());templateBuilder.pollInterval(transportApiSettings.getResponsePollInterval());return templateBuilder.build();}
④init() in DefaultTbQueueRequestTemplate:
public void init() {queueAdmin.createTopicIfNotExists(responseTemplate.getTopic());//按照是使用的中间件,实现不同的初始化方法,Inmemory该方法体为空this.requestTemplate.init();tickTs = System.currentTimeMillis();//见③,订阅主题为 tb_transport.api.responses.localHostNameresponseTemplate.subscribe();executor.submit(() -> {long nextCleanupMs = 0L;while (!stopped) {try {//从消息队列里面获取消息List<Response> responses = responseTemplate.poll(pollInterval);...........
2.TbCoreTransportApiService

PostConstruct注解方法:@PostConstructpublic void init() {this.transportCallbackExecutor = Executors.newWorkStealingPool(maxCallbackThreads);//topic是配置文件queue.transport_api.responses_topic的值默认为:tb_transport.api.responses ⑤TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> producer = tbCoreQueueFactory.createTransportApiResponseProducer();//topic是配置文件queue.transport_api.requests_topic的值,默认为:tb_transport.api.requests ⑥TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> consumer = tbCoreQueueFactory.createTransportApiRequestConsumer();DefaultTbQueueResponseTemplate.DefaultTbQueueResponseTemplateBuilder<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> builder = DefaultTbQueueResponseTemplate.builder();builder.requestTemplate(consumer);builder.responseTemplate(producer);builder.maxPendingRequests(maxPendingRequests);builder.requestTimeout(requestTimeout);builder.pollInterval(responsePollDuration);builder.executor(transportCallbackExecutor);builder.handler(transportApiService);transportApiTemplate = builder.build();
@EventListener(ApplicationReadyEvent.class)注解方法,调用了transportApiTemplate.init(transportApiService);``transportApiTemplate即上一步创建的DefaultTbQueueResponseTemplate对象init()方法为:@Overridepublic void init(TbQueueHandler<Request, Response> handler) {//按照是使用的中间件,实现不同的初始化方法,Inmemory该方法体为空this.responseTemplate.init();//见⑥,订阅主题为tb_transport.api.requestsrequestTemplate.subscribe();loopExecutor.submit(() -> {while (!stopped) {try {while (pendingRequestCount.get() >= maxPendingRequests) {try {Thread.sleep(pollInterval);} catch (InterruptedException e) {log.trace("Failed to wait until the server has capacity to handle new requests", e);}}List<Request> requests = requestTemplate.poll(pollInterval);...........
总结
DefaultTransportService和TbCoreTransportApiService方法的启动并不是很复杂,我们需要将主要的关注点放在两个 Bean 初始化消费者和生产者的 topic 上面,thingsboard 将使用中间件将消息解耦,如果按照传统的调试方法很容易找不到消息的流向,此时我们将 topic 作为关键的切入点,方便后面整个数据流的分析。
