- 该系列文章基于Thingsboard release-3.0分支的源码进行分析,可能与最新的特性有所区别。
初识项目
拉代码
把我们把代码pull下来,打开IDEA,相信大多数人的反应会是,卧槽,这什么项目,怎么这么多模块!
我刚把项目拉下来的时候也是一脸懵逼,完全不知道这么多module是怎么划分的,脸上大写的两个字,卧槽!
整理思路
既来之则安之,虽然是满脸的卧槽,但也只能静下心来慢慢分析了。其实tb的源码模块化做的还是不错的,起码让我写这代码我写不到这程度,不理解还是由于模块之间的依赖关系不清晰导致的。
找官方文档
根据tb的官网我们知道tb主要由一下这几种微服务
tb-http-transport
tb-mqtt-transport
tb-coap-transport
tb-core
tb-rule-engine
tb-js-executor
tb-web-ui
官网链接:[https://thingsboard.io/docs/reference/](https://thingsboard.io/docs/reference/)
从源码里面找到对应的服务
既然都已经知道有哪些服务了,那我只要在这一堆模块中找到对应服务的启动类就完事儿了,然后按照这个逻辑不就一下子就能找到对应服务的代码了嘛。照着这个思路,一番查找之后找到了一下几个服务
tb-http-transport---./transport/http
tb-mqtt-transport---./transport/mqtt
tb-coap-transport---./transport/coap
tb-js-executor---./msa/js-executor
tb-web-ui---./ui-ngx
其中js-executor和web-ui因为是ng和前端写的,所以可以分开来看,但是这几个transport就一个启动类是个什么鬼,而且也没有找到tb-core以及tb-rule-engine,只有ThingsboardInstallApplication和ThingsboardServerApplication这俩莫名其妙的启动类,这时候心里已经开始骂娘了。
虽然没法一下子就搞清楚项目,但起码还是通过几个启动类把服务大致划分了一下,接下来那就只能看一下这几个启动类对应服务的依赖关系了,看看能不能找到一些线索。
分析服务引用
分析transport服务的引用
由于transport是一个比较独立的模块,且不同的transport功能基本一致,所以先从这个开始分析,我先选取了mqtt-transport这个模块进行分析,它的pom文件展示了它的依赖关系
<dependency>
<groupId>org.thingsboard.common.transport</groupId>
<artifactId>mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>queue</artifactId>
</dependency>
可以看到这两个主要的依赖,从名字上基本上可以猜到queue肯定是transport的管道服务,因为之前搭建过tb的集群,所以基本猜到里面可能会有与kafka的管道相关的代码,而mqtt这个依赖基本上可以猜到是一个mqttserver服务,接下来就是分析这两个模块的源码。
完整的依赖
transport-dependency.png从这张图可以大致可以看出mqtt-transport这个服务通过transport-mqtt模块实现一个mqttserver的功能,然后通过queue模块与建立管道服务从而与其他服务进行交互,接下来就是具体看这两个模块的代码。
分析mqtt模块源码
源码路径位于 ./common/transport/mqtt
果然这个模块就是一个用netty封装的mqttserver,而且还通过配置文件的条件来判断是否要初始化这个mqttserver服务。
package org.thingsboard.server.transport.mqtt;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.ResourceLeakDetector;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/**
* @author Andrew Shvayka
*/
@Service("MqttTransportService")
@ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.mqtt.enabled}'=='true')")
@Slf4j
public class MqttTransportService {
@Value("${transport.mqtt.bind_address}")
private String host;
@Value("${transport.mqtt.bind_port}")
private Integer port;
@Value("${transport.mqtt.netty.leak_detector_level}")
private String leakDetectorLevel;
@Value("${transport.mqtt.netty.boss_group_thread_count}")
private Integer bossGroupThreadCount;
@Value("${transport.mqtt.netty.worker_group_thread_count}")
private Integer workerGroupThreadCount;
@Value("${transport.mqtt.netty.so_keep_alive}")
private boolean keepAlive;
@Autowired
private MqttTransportContext context;
private Channel serverChannel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
@PostConstruct
public void init() throws Exception {
log.info("Setting resource leak detector level to {}", leakDetectorLevel);
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
log.info("Starting MQTT transport...");
bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MqttTransportServerInitializer(context))
.childOption(ChannelOption.SO_KEEPALIVE, keepAlive);
serverChannel = b.bind(host, port).sync().channel();
log.info("Mqtt transport started!");
}
@PreDestroy
public void shutdown() throws InterruptedException {
log.info("Stopping MQTT transport!");
try {
serverChannel.close().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
log.info("MQTT transport stopped!");
}
}
对于熟悉netty的朋友来说这个就比较简单了,就是根据handler来处理接收到的消息,这个其实就没什么好多说了。不熟悉netty的朋友可以自己去百度一下就好了,就是简单的接收到消息之后丢给org.thingsboard.server.transport.mqtt.MqttTransportHandler这个类来处理而已,具体的消息处理则都是通过handler来进行。
分析MqttTransportHandler
消息都是从channelRead方法进入到handler,然后丢到processMqttMsg这个方法进行具体的分类处理。
private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
address = (InetSocketAddress) ctx.channel().remoteAddress();
if (msg.fixedHeader() == null) {
log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
processDisconnect(ctx);
return;
}
deviceSessionCtx.setChannel(ctx);
switch (msg.fixedHeader().messageType()) {
case CONNECT:
processConnect(ctx, (MqttConnectMessage) msg);
break;
case PUBLISH:
processPublish(ctx, (MqttPublishMessage) msg);
break;
case SUBSCRIBE:
processSubscribe(ctx, (MqttSubscribeMessage) msg);
break;
case UNSUBSCRIBE:
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break;
case PINGREQ:
if (checkConnected(ctx, msg)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
transportService.reportActivity(sessionInfo);
}
break;
case DISCONNECT:
if (checkConnected(ctx, msg)) {
processDisconnect(ctx);
}
break;
default:
break;
}
}
通过这个handler,消息会按照类型来做不通的处理。越往下面看就会有更多的模块之间的依赖,将模块独立出来分析反而不是很方便,所以接下来换一种思路,从一个完整的鉴权流程来分析源码以及模块之间的依赖关系。
分析连接鉴权流程
tb默认的设备鉴权方式是token的方式,所以先挑这个来分析整个的数据上下行流程,因此我们详细分析一下processAuthTokenConnect这个方法,源码如下。
private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
String userName = msg.payload().userName();
log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);
if (StringUtils.isEmpty(userName)) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
ctx.close();
} else {
transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
@Override
public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
onValidateDeviceResponse(msg, ctx);
}
@Override
public void onError(Throwable e) {
log.trace("[{}] Failed to process credentials: {}", address, userName, e);
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
ctx.close();
}
});
}
}
通过将token提取出来,然后拼装成一个ValidateDeviceTokenRequestMsg对象之后调用transportService服务。其中transportService服务的实现类为org.thingsboard.server.common.transport.service.DefaultTransportService,该类属于transport-api模块。
@Slf4j
@Service
@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport'")
public class DefaultTransportService implements TransportService {
@Value("${transport.rate_limits.enabled}")
private boolean rateLimitEnabled;
@Value("${transport.rate_limits.tenant}")
通过process将消息发送出去,process的时候又调用了transportApiRequestTemplate这个对象
@Override
public void process(TransportProtos.ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback) {
log.trace("Processing msg: {}", msg);
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build());
AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg),
response -> callback.onSuccess(response.getValue().getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
}
这个对象对应的实现类位于queue模块,org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate,他的send方法会返回一个ListenableFuture
@Override
public ListenableFuture<Response> send(Request request) {
if (tickSize > maxPendingRequests) {
return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
}
UUID requestId = UUID.randomUUID();
request.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
request.getHeaders().put(RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()));
request.getHeaders().put(REQUEST_TIME, longToBytes(System.currentTimeMillis()));
SettableFuture<Response> future = SettableFuture.create();
ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
pendingRequests.putIfAbsent(requestId, responseMetaData);
log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, request.getKey(), responseMetaData.expTime);
requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.trace("[{}] Request sent: {}", requestId, metadata);
}
@Override
public void onFailure(Throwable t) {
pendingRequests.remove(requestId);
future.setException(t);
}
});
return future;
}
send方法通过requestTemplate进行消息的发送,因为使用的是Kafka作为消息管道,所以其实现类为queue模块的org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate类,调用其send方法将消息真实发送出去,其中producer就是一个kafka的producer实现,最后将信息发送到tb_transport.api.requests这个topical中。
@Override
public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
createTopicIfNotExist(tpi);
String key = msg.getKey().toString();
byte[] data = msg.getData();
ProducerRecord<String, byte[]> record;
Iterable<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList());
record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
if (callback != null) {
callback.onSuccess(new KafkaTbQueueMsgMetadata(metadata));
}
} else {
if (callback != null) {
callback.onFailure(exception);
} else {
log.warn("Producer template failure: {}", exception.getMessage(), exception);
}
}
});
}
乍看至下认证的流程已经完成了,其实仔细一看里面问题还很多,首先需要问的是这个认证信息是如何返回给mqtt-transport的?
像kafka这类消息管道都是异步进行数据的传输的,所以发送消息之后最多会收到一个是否发送成功的信息,实际上是没有认证的返回信息的,但是在MqttTransportHandler的processAuthTokenConnect方法里面明确是有对返回信息做进一步核实的代码的,那这个返回信息到底是怎么来的呢?
如何拿到认证的返回信息
在tb的代码中有很多类似的异步操作,并且都是要对返回信息进行确认的。比如这个认证信息,其实是通过queue模块的org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate方法来实现的,下面是处理的源码
@Override
public void init() {
queueAdmin.createTopicIfNotExists(responseTemplate.getTopic());
this.requestTemplate.init();
tickTs = System.currentTimeMillis();
responseTemplate.subscribe();
executor.submit(() -> {
long nextCleanupMs = 0L;
while (!stopped) {
try {
List<Response> responses = responseTemplate.poll(pollInterval);
if (responses.size() > 0) {
log.trace("Polling responses completed, consumer records count [{}]", responses.size());
} else {
continue;
}
responses.forEach(response -> {
byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER);
UUID requestId;
if (requestIdHeader == null) {
log.error("[{}] Missing requestId in header and body", response);
} else {
requestId = bytesToUuid(requestIdHeader);
log.trace("[{}] Response received: {}", requestId, response);
ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
if (expectedResponse == null) {
log.trace("[{}] Invalid or stale request", requestId);
} else {
expectedResponse.future.set(response);
}
}
});
responseTemplate.commit();
tickTs = System.currentTimeMillis();
tickSize = pendingRequests.size();
if (nextCleanupMs < tickTs) {
//cleanup;
pendingRequests.forEach((key, value) -> {
if (value.expTime < tickTs) {
ResponseMetaData<Response> staleRequest = pendingRequests.remove(key);
if (staleRequest != null) {
log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", key, staleRequest.expTime, tickTs);
staleRequest.future.setException(new TimeoutException());
}
}
});
nextCleanupMs = tickTs + maxRequestTimeout;
}
} catch (Throwable e) {
log.warn("Failed to obtain responses from queue.", e);
try {
Thread.sleep(pollInterval);
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new responses", e2);
}
}
}
});
}
服务启动的时候会创建一个response对应topical的消费者,对应的topical格式为tb_transport.api.responses.hostname,然后从这个topical拉取消息。
DefaultTbQueueRequestTemplate.send()方法在发送的时候会将当前需要发送的消息存入pendingRequests对象,以requestId为唯一ID,将返回的Future装箱之后作为value。当从topical中获取到消息,且ID可以对应上的时候会将pendingRequests对应key的value的状态设置为已完成,然后出发回调。
回调之后就可以拿到core返回的认证信息了,这样就完成了一个完整的认证流程。
网友评论