1. application
的目录架构
└── java
└── org/thingsboard/server
├── ThingsboardInstallApplication.java (程序初始化入口-安装脚本等)
├── ThingsboardServerApplication.java (程序服务启动入口)
├── actors (Actor模型消息处理核心逻辑)
├── config (配置类)
├── controller (服务接口层)
├── exception (异常)
├── install (安装相关)
├── service (服务层)
└── utils (工具类)
└── resources
├── banner.txt (应用启动时的控制台输出logo)
├── i18n (国际化相关)
├── logback.xml (日志框架的配置文件)
├── templates (模版)
└── thingsboard.yml (配置文件)
从java的入口中有两个:
org/thingsboard/server/ThingsboardInstallApplication.java
org/thingsboard/server/ThingsboardServerApplication.java
其中ThingsboardInstallApplication这个启动项对应的是tb的安装服务,主要是初始化数据库相关的信息;而ThingsboardServerApplication则对应core和rule-engine服务。IDE导入源码后,通过不同的启动类的方式来启动指定具体要启动的服务。
2. 项目初始化
由于项目是springboot 框架,在实际项目中,服务启动后做一些初始化工作,例如线程池初始化、文件资源加载、常驻后台任务启动(比如kafka consumer)等。一般初始化资源的方法有3类
- Spring Bean初始化的InitializingBean,init-method和PostConstruct
- ApplicationRunner与CommandLineRunner接口
- Spring的事件机制
所以先搜索
@EventListener
看看如下图
@PostConstruct
看看如下图
看到log.info("Initializing actor system.");
的注释,初步怀疑从这里开始。所以进入到这个函数再细看下:
public void initActorSystem() {
log.info("Initializing actor system.");
actorContext.setActorService(this); //初始化上下文
TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);
system = new DefaultTbActorSystem(settings);//初始化系统,主要初始化任务执行扔线程池
//初始化派发
system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));
system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));
system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));
system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));
actorContext.setActorSystem(system);
appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));
actorContext.setAppActor(appActor);
TbActorRef statsActor = system.createRootActor(TENANT_DISPATCHER_NAME, new StatsActor.ActorCreator(actorContext, "StatsActor"));
actorContext.setStatsActor(statsActor);
log.info("Actor system initialized.");
}
DefaultActorService 完成了acotr的初始化:
- 1.初始化上下文
actorContext
; - 2.创建
DefaultTbActorSystem
,初始化任务执行的线程池; - 3.创建AppAcotr
system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));
, 这个是系统的根,是其它actor的parent actor, 即所有消息需要通过该actor;- 3.1.创建 AppActor的同时,需要创建对应的
TbActorMailbox
并与actor进行关联 - 3.2.
mailbox.initActor()
至 AppActor 的init()方法 - 3.3.mailbox 调用到 对应的Actor 的
protected abstract boolean doProcess(TbActorMsg msg);
的实现, 即返回到AppActor的doProcess
处理进来的消息
- 3.1.创建 AppActor的同时,需要创建对应的
- 4.AppActor的
doProcess
处理时,调用initTenantActors()
初始化租户Actor(TenantActor) - 5.调用至
ctx.getOrCreateChildActor
,ctx.getOrCreateChildActor
中的cxt就是AppActor的mailbox引用,实际还是调回了system的TbActorRef createActor(String dispatcherId, TbActorCreator creator, TbActorId parent)
这个方法,接下来同appActor创建即第3步骤一致- 5.1.创建 TenantActor的同时,需要创建对应的
TbActorMailbox
并与actor进行关联 - 5.2.
mailbox.initActor()
至 TenantActor的init()
方法,里面调用了initRuleChains();
- 5.3.
TbActorRef actorRef = getOrCreateActor(ruleChainId, id -> ruleChain);
根据ruleChain创建RuleChainActor,这里又回到Actor的创建步骤。 一个Tenant对应多个RuleChain,并且有一个RootChain,所以TenantActor在initRuleChains()时候将其RootChain赋给rootChain,并将RuleChainActor的MailBox赋予rootChainActor; - 5.3中RuleChainActor的创建的会调用到RuleChainActor的init()方法,调用了
createProcessor(ctx);
和initProcessor(ctx); 用来创建
RuleChainActorMessageProcessor并开始用来处理RuleChainActor的消息processor.start(ctx);
- 5.4. 在
processor.start(ctx);
RuleChainActorMessageProcessor 的 start方法又以RuleChain创建对应的RuleNodeActor,主要通过rule_node
和relation
这两张表中的数据进行排序,一个RuleNode对应一个 RuleNodeActor。 - 创建 RuleNodeActor 又创建一个对应RuleNodeActorMessageProcessor来处理节点的状态变化,RuleNodeActorMessageProcessor初始化的时候
tbNode = initComponent(ruleNode);
会通过反射的方式拿到RuleNode对应的TbNode接口实现类,消息实际会通过这个实现类来进行处理。
至此,Actor初始化完成。总结下关键点就是 system, Actor, mailbox 几个类, system创建actor和actor对应的mailbox, 并通过 actor的处理消息的doProcess方法层层递进创建 TenantActor、RuleChainActor、RuleNodeActor、TbNode直至末稍实现类
- 5.1.创建 TenantActor的同时,需要创建对应的
@Override
protected boolean doProcess(TbActorMsg msg) {
if (!ruleChainsInitialized) {
"initTenantActors();"//初始化租户Actor
ruleChainsInitialized = true;
if (msg.getMsgType() != MsgType.APP_INIT_MSG && msg.getMsgType() != MsgType.PARTITION_CHANGE_MSG) {
log.warn("Rule Chains initialized by unexpected message: {}", msg);
}
}
......
private void initTenantActors() {
log.info("Starting main system actor.");
try {
// This Service may be started for specific tenant only.
Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();
if (isolatedTenantId.isPresent()) {
Tenant tenant = systemContext.getTenantService().findTenantById(isolatedTenantId.get());
if (tenant != null) {
log.debug("[{}] Creating tenant actor", tenant.getId());
getOrCreateTenantActor(tenant.getId());
log.debug("Tenant actor created.");
} else {
log.error("[{}] Tenant with such ID does not exist", isolatedTenantId.get());
}
} else if (systemContext.isTenantComponentsInitEnabled()) {
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
boolean isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
boolean isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
for (Tenant tenant : tenantIterator) {
TenantProfile tenantProfile = tenantProfileCache.get(tenant.getTenantProfileId());
if (isCore || (isRuleEngine && !tenantProfile.isIsolatedTbRuleEngine())) {
log.debug("[{}] Creating tenant actor", tenant.getId());
getOrCreateTenantActor(tenant.getId());
log.debug("[{}] Tenant actor created.", tenant.getId());
}
}
}
log.info("Main system actor started.");
} catch (Exception e) {
log.warn("Unknown failure", e);
}
}
private TbActorRef getOrCreateTenantActor(TenantId tenantId) {
return ctx.getOrCreateChildActor(new TbEntityActorId(tenantId),
() -> DefaultActorService.TENANT_DISPATCHER_NAME,
() -> new TenantActor.ActorCreator(systemContext, tenantId));
}
image.png
网友评论