Thingsboard源码探索(1)

作者: 哦呵呵_3579 | 来源:发表于2020-08-05 10:38 被阅读0次
    • 该系列文章基于Thingsboard release-3.0分支的源码进行分析,可能与最新的特性有所区别。

    初识项目

    拉代码

    把我们把代码pull下来,打开IDEA,相信大多数人的反应会是,卧槽,这什么项目,怎么这么多模块!

    我刚把项目拉下来的时候也是一脸懵逼,完全不知道这么多module是怎么划分的,脸上大写的两个字,卧槽!

    整理思路

    既来之则安之,虽然是满脸的卧槽,但也只能静下心来慢慢分析了。其实tb的源码模块化做的还是不错的,起码让我写这代码我写不到这程度,不理解还是由于模块之间的依赖关系不清晰导致的。

    找官方文档

    根据tb的官网我们知道tb主要由一下这几种微服务

    tb-http-transport 
    tb-mqtt-transport
    tb-coap-transport
    tb-core
    tb-rule-engine
    tb-js-executor
    tb-web-ui
    
    官网链接:[https://thingsboard.io/docs/reference/](https://thingsboard.io/docs/reference/)
    
    

    从源码里面找到对应的服务

    既然都已经知道有哪些服务了,那我只要在这一堆模块中找到对应服务的启动类就完事儿了,然后按照这个逻辑不就一下子就能找到对应服务的代码了嘛。照着这个思路,一番查找之后找到了一下几个服务

    tb-http-transport---./transport/http
    tb-mqtt-transport---./transport/mqtt
    tb-coap-transport---./transport/coap
    tb-js-executor---./msa/js-executor
    tb-web-ui---./ui-ngx
    

    其中js-executor和web-ui因为是ng和前端写的,所以可以分开来看,但是这几个transport就一个启动类是个什么鬼,而且也没有找到tb-core以及tb-rule-engine,只有ThingsboardInstallApplication和ThingsboardServerApplication这俩莫名其妙的启动类,这时候心里已经开始骂娘了。

    虽然没法一下子就搞清楚项目,但起码还是通过几个启动类把服务大致划分了一下,接下来那就只能看一下这几个启动类对应服务的依赖关系了,看看能不能找到一些线索。

    分析服务引用

    分析transport服务的引用

    由于transport是一个比较独立的模块,且不同的transport功能基本一致,所以先从这个开始分析,我先选取了mqtt-transport这个模块进行分析,它的pom文件展示了它的依赖关系

            <dependency>
                <groupId>org.thingsboard.common.transport</groupId>
                <artifactId>mqtt</artifactId>
            </dependency>
            <dependency>
                <groupId>org.thingsboard.common</groupId>
                <artifactId>queue</artifactId>
            </dependency>
    

    可以看到这两个主要的依赖,从名字上基本上可以猜到queue肯定是transport的管道服务,因为之前搭建过tb的集群,所以基本猜到里面可能会有与kafka的管道相关的代码,而mqtt这个依赖基本上可以猜到是一个mqttserver服务,接下来就是分析这两个模块的源码。

    完整的依赖

    transport-dependency.png

    从这张图可以大致可以看出mqtt-transport这个服务通过transport-mqtt模块实现一个mqttserver的功能,然后通过queue模块与建立管道服务从而与其他服务进行交互,接下来就是具体看这两个模块的代码。

    分析mqtt模块源码

    源码路径位于 ./common/transport/mqtt

    果然这个模块就是一个用netty封装的mqttserver,而且还通过配置文件的条件来判断是否要初始化这个mqttserver服务。

    package org.thingsboard.server.transport.mqtt;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.util.ResourceLeakDetector;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.PreDestroy;
    
    /**
     * @author Andrew Shvayka
     */
    @Service("MqttTransportService")
    @ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.mqtt.enabled}'=='true')")
    @Slf4j
    public class MqttTransportService {
    
        @Value("${transport.mqtt.bind_address}")
        private String host;
        @Value("${transport.mqtt.bind_port}")
        private Integer port;
    
        @Value("${transport.mqtt.netty.leak_detector_level}")
        private String leakDetectorLevel;
        @Value("${transport.mqtt.netty.boss_group_thread_count}")
        private Integer bossGroupThreadCount;
        @Value("${transport.mqtt.netty.worker_group_thread_count}")
        private Integer workerGroupThreadCount;
        @Value("${transport.mqtt.netty.so_keep_alive}")
        private boolean keepAlive;
    
        @Autowired
        private MqttTransportContext context;
    
        private Channel serverChannel;
        private EventLoopGroup bossGroup;
        private EventLoopGroup workerGroup;
    
        @PostConstruct
        public void init() throws Exception {
            log.info("Setting resource leak detector level to {}", leakDetectorLevel);
            ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
    
            log.info("Starting MQTT transport...");
            bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
            workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new MqttTransportServerInitializer(context))
                    .childOption(ChannelOption.SO_KEEPALIVE, keepAlive);
    
            serverChannel = b.bind(host, port).sync().channel();
            log.info("Mqtt transport started!");
        }
    
        @PreDestroy
        public void shutdown() throws InterruptedException {
            log.info("Stopping MQTT transport!");
            try {
                serverChannel.close().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
            log.info("MQTT transport stopped!");
        }
    }
    
    

    对于熟悉netty的朋友来说这个就比较简单了,就是根据handler来处理接收到的消息,这个其实就没什么好多说了。不熟悉netty的朋友可以自己去百度一下就好了,就是简单的接收到消息之后丢给org.thingsboard.server.transport.mqtt.MqttTransportHandler这个类来处理而已,具体的消息处理则都是通过handler来进行。

    分析MqttTransportHandler

    消息都是从channelRead方法进入到handler,然后丢到processMqttMsg这个方法进行具体的分类处理。

        private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
            address = (InetSocketAddress) ctx.channel().remoteAddress();
            if (msg.fixedHeader() == null) {
                log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
                processDisconnect(ctx);
                return;
            }
            deviceSessionCtx.setChannel(ctx);
            switch (msg.fixedHeader().messageType()) {
                case CONNECT:
                    processConnect(ctx, (MqttConnectMessage) msg);
                    break;
                case PUBLISH:
                    processPublish(ctx, (MqttPublishMessage) msg);
                    break;
                case SUBSCRIBE:
                    processSubscribe(ctx, (MqttSubscribeMessage) msg);
                    break;
                case UNSUBSCRIBE:
                    processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
                    break;
                case PINGREQ:
                    if (checkConnected(ctx, msg)) {
                        ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
                        transportService.reportActivity(sessionInfo);
                    }
                    break;
                case DISCONNECT:
                    if (checkConnected(ctx, msg)) {
                        processDisconnect(ctx);
                    }
                    break;
                default:
                    break;
            }
        }
    

    通过这个handler,消息会按照类型来做不通的处理。越往下面看就会有更多的模块之间的依赖,将模块独立出来分析反而不是很方便,所以接下来换一种思路,从一个完整的鉴权流程来分析源码以及模块之间的依赖关系。

    分析连接鉴权流程

    tb默认的设备鉴权方式是token的方式,所以先挑这个来分析整个的数据上下行流程,因此我们详细分析一下processAuthTokenConnect这个方法,源码如下。

        private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
            String userName = msg.payload().userName();
            log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);
            if (StringUtils.isEmpty(userName)) {
                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
                ctx.close();
            } else {
                transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
                        new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
                            @Override
                            public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
                                onValidateDeviceResponse(msg, ctx);
                            }
    
                            @Override
                            public void onError(Throwable e) {
                                log.trace("[{}] Failed to process credentials: {}", address, userName, e);
                                ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
                                ctx.close();
                            }
                        });
            }
        }
    

    通过将token提取出来,然后拼装成一个ValidateDeviceTokenRequestMsg对象之后调用transportService服务。其中transportService服务的实现类为org.thingsboard.server.common.transport.service.DefaultTransportService,该类属于transport-api模块。

    @Slf4j
    @Service
    @ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport'")
    public class DefaultTransportService implements TransportService {
    
        @Value("${transport.rate_limits.enabled}")
        private boolean rateLimitEnabled;
        @Value("${transport.rate_limits.tenant}")
    

    通过process将消息发送出去,process的时候又调用了transportApiRequestTemplate这个对象

    @Override
        public void process(TransportProtos.ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback) {
            log.trace("Processing msg: {}", msg);
            TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build());
            AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg),
                    response -> callback.onSuccess(response.getValue().getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
        }
    

    这个对象对应的实现类位于queue模块,org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate,他的send方法会返回一个ListenableFuture

    @Override
        public ListenableFuture<Response> send(Request request) {
            if (tickSize > maxPendingRequests) {
                return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
            }
            UUID requestId = UUID.randomUUID();
            request.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
            request.getHeaders().put(RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()));
            request.getHeaders().put(REQUEST_TIME, longToBytes(System.currentTimeMillis()));
            SettableFuture<Response> future = SettableFuture.create();
            ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
            pendingRequests.putIfAbsent(requestId, responseMetaData);
            log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, request.getKey(), responseMetaData.expTime);
            requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {
                @Override
                public void onSuccess(TbQueueMsgMetadata metadata) {
                    log.trace("[{}] Request sent: {}", requestId, metadata);
                }
    
                @Override
                public void onFailure(Throwable t) {
                    pendingRequests.remove(requestId);
                    future.setException(t);
                }
            });
            return future;
        }
    

    send方法通过requestTemplate进行消息的发送,因为使用的是Kafka作为消息管道,所以其实现类为queue模块的org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate类,调用其send方法将消息真实发送出去,其中producer就是一个kafka的producer实现,最后将信息发送到tb_transport.api.requests这个topical中。

    @Override
        public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
            createTopicIfNotExist(tpi);
            String key = msg.getKey().toString();
            byte[] data = msg.getData();
            ProducerRecord<String, byte[]> record;
            Iterable<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList());
            record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers);
            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    if (callback != null) {
                        callback.onSuccess(new KafkaTbQueueMsgMetadata(metadata));
                    }
                } else {
                    if (callback != null) {
                        callback.onFailure(exception);
                    } else {
                        log.warn("Producer template failure: {}", exception.getMessage(), exception);
                    }
                }
            });
        }
    

    乍看至下认证的流程已经完成了,其实仔细一看里面问题还很多,首先需要问的是这个认证信息是如何返回给mqtt-transport的?

    像kafka这类消息管道都是异步进行数据的传输的,所以发送消息之后最多会收到一个是否发送成功的信息,实际上是没有认证的返回信息的,但是在MqttTransportHandler的processAuthTokenConnect方法里面明确是有对返回信息做进一步核实的代码的,那这个返回信息到底是怎么来的呢?

    如何拿到认证的返回信息

    在tb的代码中有很多类似的异步操作,并且都是要对返回信息进行确认的。比如这个认证信息,其实是通过queue模块的org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate方法来实现的,下面是处理的源码

        @Override
        public void init() {
            queueAdmin.createTopicIfNotExists(responseTemplate.getTopic());
            this.requestTemplate.init();
            tickTs = System.currentTimeMillis();
            responseTemplate.subscribe();
            executor.submit(() -> {
                long nextCleanupMs = 0L;
                while (!stopped) {
                    try {
                        List<Response> responses = responseTemplate.poll(pollInterval);
                        if (responses.size() > 0) {
                            log.trace("Polling responses completed, consumer records count [{}]", responses.size());
                        } else {
                            continue;
                        }
                        responses.forEach(response -> {
                            byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER);
                            UUID requestId;
                            if (requestIdHeader == null) {
                                log.error("[{}] Missing requestId in header and body", response);
                            } else {
                                requestId = bytesToUuid(requestIdHeader);
                                log.trace("[{}] Response received: {}", requestId, response);
                                ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
                                if (expectedResponse == null) {
                                    log.trace("[{}] Invalid or stale request", requestId);
                                } else {
                                    expectedResponse.future.set(response);
                                }
                            }
                        });
                        responseTemplate.commit();
                        tickTs = System.currentTimeMillis();
                        tickSize = pendingRequests.size();
                        if (nextCleanupMs < tickTs) {
                            //cleanup;
                            pendingRequests.forEach((key, value) -> {
                                if (value.expTime < tickTs) {
                                    ResponseMetaData<Response> staleRequest = pendingRequests.remove(key);
                                    if (staleRequest != null) {
                                        log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", key, staleRequest.expTime, tickTs);
                                        staleRequest.future.setException(new TimeoutException());
                                    }
                                }
                            });
                            nextCleanupMs = tickTs + maxRequestTimeout;
                        }
                    } catch (Throwable e) {
                        log.warn("Failed to obtain responses from queue.", e);
                        try {
                            Thread.sleep(pollInterval);
                        } catch (InterruptedException e2) {
                            log.trace("Failed to wait until the server has capacity to handle new responses", e2);
                        }
                    }
                }
            });
        }
    

    服务启动的时候会创建一个response对应topical的消费者,对应的topical格式为tb_transport.api.responses.hostname,然后从这个topical拉取消息。

    DefaultTbQueueRequestTemplate.send()方法在发送的时候会将当前需要发送的消息存入pendingRequests对象,以requestId为唯一ID,将返回的Future装箱之后作为value。当从topical中获取到消息,且ID可以对应上的时候会将pendingRequests对应key的value的状态设置为已完成,然后出发回调。

    回调之后就可以拿到core返回的认证信息了,这样就完成了一个完整的认证流程。

    相关文章

      网友评论

        本文标题:Thingsboard源码探索(1)

        本文链接:https://www.haomeiwen.com/subject/pdkerktx.html