美文网首页thingsboard
thingsboard源码解析(1)--Actor系统流程

thingsboard源码解析(1)--Actor系统流程

作者: 永和包仔 | 来源:发表于2022-03-17 16:46 被阅读0次

    1. application 的目录架构

    └── java
        └── org/thingsboard/server
            ├── ThingsboardInstallApplication.java (程序初始化入口-安装脚本等)
            ├── ThingsboardServerApplication.java (程序服务启动入口)
            ├── actors (Actor模型消息处理核心逻辑)
            ├── config (配置类)
            ├── controller (服务接口层)
            ├── exception (异常)
            ├── install (安装相关)
            ├── service (服务层)
            └── utils (工具类)
    └── resources
        ├── banner.txt (应用启动时的控制台输出logo)
        ├── i18n (国际化相关)
        ├── logback.xml (日志框架的配置文件)
        ├── templates (模版)
        └── thingsboard.yml (配置文件)    
    

    从java的入口中有两个:

    org/thingsboard/server/ThingsboardInstallApplication.java
    org/thingsboard/server/ThingsboardServerApplication.java
    

    其中ThingsboardInstallApplication这个启动项对应的是tb的安装服务,主要是初始化数据库相关的信息;而ThingsboardServerApplication则对应core和rule-engine服务。IDE导入源码后,通过不同的启动类的方式来启动指定具体要启动的服务。

    2. 项目初始化

    由于项目是springboot 框架,在实际项目中,服务启动后做一些初始化工作,例如线程池初始化、文件资源加载、常驻后台任务启动(比如kafka consumer)等。一般初始化资源的方法有3类

    • Spring Bean初始化的InitializingBean,init-method和PostConstruct
    • ApplicationRunner与CommandLineRunner接口
    • Spring的事件机制

    所以先搜索
    @EventListener看看如下图

    image.png

    @PostConstruct看看如下图

    image.png

    看到log.info("Initializing actor system.");的注释,初步怀疑从这里开始。所以进入到这个函数再细看下:

        public void initActorSystem() {
            log.info("Initializing actor system.");
            actorContext.setActorService(this);  //初始化上下文
            TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);
            system = new DefaultTbActorSystem(settings);//初始化系统,主要初始化任务执行扔线程池
            
            //初始化派发
            system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));
            system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));
            system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));
            system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));
    
            actorContext.setActorSystem(system);
    
            appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));
            actorContext.setAppActor(appActor);
    
            TbActorRef statsActor = system.createRootActor(TENANT_DISPATCHER_NAME, new StatsActor.ActorCreator(actorContext, "StatsActor"));
            actorContext.setStatsActor(statsActor);
    
            log.info("Actor system initialized.");
        }
    

    DefaultActorService 完成了acotr的初始化:

    • 1.初始化上下文actorContext
    • 2.创建DefaultTbActorSystem,初始化任务执行的线程池;
    • 3.创建AppAcotr system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));, 这个是系统的根,是其它actor的parent actor, 即所有消息需要通过该actor;
      • 3.1.创建 AppActor的同时,需要创建对应的TbActorMailbox并与actor进行关联
      • 3.2.mailbox.initActor()至 AppActor 的init()方法
      • 3.3.mailbox 调用到 对应的Actor 的protected abstract boolean doProcess(TbActorMsg msg); 的实现, 即返回到AppActor的doProcess处理进来的消息
    • 4.AppActor的 doProcess处理时,调用initTenantActors()初始化租户Actor(TenantActor)
    • 5.调用至ctx.getOrCreateChildActor, ctx.getOrCreateChildActor中的cxt就是AppActor的mailbox引用,实际还是调回了system的TbActorRef createActor(String dispatcherId, TbActorCreator creator, TbActorId parent)这个方法,接下来同appActor创建即第3步骤一致
      • 5.1.创建 TenantActor的同时,需要创建对应的TbActorMailbox并与actor进行关联
      • 5.2.mailbox.initActor() 至 TenantActor的init()方法,里面调用了initRuleChains();
      • 5.3. TbActorRef actorRef = getOrCreateActor(ruleChainId, id -> ruleChain);根据ruleChain创建RuleChainActor,这里又回到Actor的创建步骤。 一个Tenant对应多个RuleChain,并且有一个RootChain,所以TenantActor在initRuleChains()时候将其RootChain赋给rootChain,并将RuleChainActor的MailBox赋予rootChainActor;
      • 5.3中RuleChainActor的创建的会调用到RuleChainActor的init()方法,调用了createProcessor(ctx);initProcessor(ctx); 用来创建RuleChainActorMessageProcessor并开始用来处理RuleChainActor的消息processor.start(ctx);
      • 5.4. 在processor.start(ctx); RuleChainActorMessageProcessor 的 start方法又以RuleChain创建对应的RuleNodeActor,主要通过rule_noderelation这两张表中的数据进行排序,一个RuleNode对应一个 RuleNodeActor。
      • 创建 RuleNodeActor 又创建一个对应RuleNodeActorMessageProcessor来处理节点的状态变化,RuleNodeActorMessageProcessor初始化的时候tbNode = initComponent(ruleNode);会通过反射的方式拿到RuleNode对应的TbNode接口实现类,消息实际会通过这个实现类来进行处理。
        至此,Actor初始化完成。总结下关键点就是 system, Actor, mailbox 几个类, system创建actor和actor对应的mailbox, 并通过 actor的处理消息的doProcess方法层层递进创建 TenantActor、RuleChainActor、RuleNodeActor、TbNode直至末稍实现类
        @Override
        protected boolean doProcess(TbActorMsg msg) {
            if (!ruleChainsInitialized) {
                "initTenantActors();"//初始化租户Actor
                ruleChainsInitialized = true;
                if (msg.getMsgType() != MsgType.APP_INIT_MSG && msg.getMsgType() != MsgType.PARTITION_CHANGE_MSG) {
                    log.warn("Rule Chains initialized by unexpected message: {}", msg);
                }
            }
    ......
    
        private void initTenantActors() {
            log.info("Starting main system actor.");
            try {
                // This Service may be started for specific tenant only.
                Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();
                if (isolatedTenantId.isPresent()) {
                    Tenant tenant = systemContext.getTenantService().findTenantById(isolatedTenantId.get());
                    if (tenant != null) {
                        log.debug("[{}] Creating tenant actor", tenant.getId());
                        getOrCreateTenantActor(tenant.getId());
                        log.debug("Tenant actor created.");
                    } else {
                        log.error("[{}] Tenant with such ID does not exist", isolatedTenantId.get());
                    }
                } else if (systemContext.isTenantComponentsInitEnabled()) {
                    PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
                    boolean isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
                    boolean isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
                    for (Tenant tenant : tenantIterator) {
                        TenantProfile tenantProfile = tenantProfileCache.get(tenant.getTenantProfileId());
                        if (isCore || (isRuleEngine && !tenantProfile.isIsolatedTbRuleEngine())) {
                            log.debug("[{}] Creating tenant actor", tenant.getId());
                            getOrCreateTenantActor(tenant.getId());
                            log.debug("[{}] Tenant actor created.", tenant.getId());
                        }
                    }
                }
                log.info("Main system actor started.");
            } catch (Exception e) {
                log.warn("Unknown failure", e);
            }
        }
    
        private TbActorRef getOrCreateTenantActor(TenantId tenantId) {
            return ctx.getOrCreateChildActor(new TbEntityActorId(tenantId),
                    () -> DefaultActorService.TENANT_DISPATCHER_NAME,
                    () -> new TenantActor.ActorCreator(systemContext, tenantId));
        }
    
    
    image.png

    相关文章

      网友评论

        本文标题:thingsboard源码解析(1)--Actor系统流程

        本文链接:https://www.haomeiwen.com/subject/vtxwdrtx.html