美文网首页
Thingsboard源码探索(2)

Thingsboard源码探索(2)

作者: 哦呵呵_3579 | 来源:发表于2020-08-10 17:21 被阅读0次

上一篇简单分析了mqtt-transport的代码,说明了设备连接的流程,相对的设备状态上报的流程基本一致,所以就不再自己分析了,有需要的小伙伴可以自己去研究一下。

接下来准备探索core和rule-engine的源码

虽然官网上把它分成了两个独立的部分,但由于这两个服务之间有一些交叉的功能,所以被直接写在了一个服务里面,关键代码都在application这个服务里面。

这个module里面有两个启动项,分别是ThingsboardInstallApplication和ThingsboardServerApplication,其中ThingsboardInstallApplication这个启动项对应的是tb的安装服务,主要是初始化数据库相关的信息。而ThingsboardServerApplication则对应core和rule-engine服务。打包的时候会打包在一起,然后通过启动类的方式来显示的指定具体要启动的服务。

可以看到tb其实做的确实很到位,连数据库的初始化也包装成了一个服务,只需要通过启动参数的方式就可以完成整个数据库的初始化,而不需要自己去跑脚本,而且连数据库的升级也封装了进去,这个确实很赞!

说完了一堆废话,我们转回正题,开始分析core和rule-engine服务,先看一下大致的包结构,主要有一下这些包(config、utils之类的包就不深入研究了)

actors
controller
install
service

主要有一些难度的是actor这个包,所以这次就针对这个Actor模型来进行一次详细的分析。

Actor详解

Actor模型是TB处理消息的核心机制,具体的Actor模型的相关知识可以自行百度,这里这谈TB的Actor设计

1、由org.thingsboard.server.actors.service.DefaultActorService进行Actor系统的初始化

    @PostConstruct
    public void initActorSystem() {
        log.info("Initializing actor system.");
        actorContext.setActorService(this);
        TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);
        system = new DefaultTbActorSystem(settings);

        system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));
        system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));
        system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));
        system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));

        actorContext.setActorSystem(system);

        appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));

        // 将appActor作为根消息传递的代理actor
        actorContext.setAppActor(appActor);

        TbActorRef statsActor = system.createRootActor(TENANT_DISPATCHER_NAME, new StatsActor.ActorCreator(actorContext, "StatsActor"));

        // 将statsActor作为根统计数据的代理
        actorContext.setStatsActor(statsActor);

        log.info("Actor system initialized.");
    }

2、初始化的时候会先创建一个Actor系统(org.thingsboard.server.actors.DefaultTbActorSystem)并设置他的一些线程池以及Actor系统的上下文(org.thingsboard.server.actors.ActorSystemContext)。

3、接下来会创建一个根Actor(org.thingsboard.server.actors.app.AppActor),并将其设置为Actor系统的根Actor,这个Actor是所有消息的入口,是其他所有Actor的Parent。

4、初始化完成之后,通过Spring的ApplicationReadyEvent事件,DefaultActorService会往DefaultTbActorSystem的根Actor的Mail里面发送一条AppInitMsg消息。

5、AppActor会从它的Mail里面把消息拿出来,通过process方法进行处理。

   @Override
    protected boolean doProcess(TbActorMsg msg) {
        if (!ruleChainsInitialized) {
            initTenantActors();
            ruleChainsInitialized = true;
            if (msg.getMsgType() != MsgType.APP_INIT_MSG) {
                log.warn("Rule Chains initialized by unexpected message: {}", msg);
            }
        }
        switch (msg.getMsgType()) {
            case APP_INIT_MSG:
                break;
            case PARTITION_CHANGE_MSG:
                ctx.broadcastToChildren(msg);
                break;
            case COMPONENT_LIFE_CYCLE_MSG:
                onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
                break;
            case QUEUE_TO_RULE_ENGINE_MSG:
                onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
                break;
            case TRANSPORT_TO_DEVICE_ACTOR_MSG:
                onToDeviceActorMsg((TenantAwareMsg) msg, false);
                break;
            case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
            case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
            case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
            case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
            case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
                onToDeviceActorMsg((TenantAwareMsg) msg, true);
                break;
            default:
                return false;
        }
        return true;
    }

5、第一次收到消息的时候会先进行TenantActor的初始化,从数据库的tenant表里面拿出所有的数据,并进行加载,每一个Tenant对应一个TenantActor。

    private void initTenantActors() {
        log.info("Starting main system actor.");
        try {
            // This Service may be started for specific tenant only.
            Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();
            if (isolatedTenantId.isPresent()) {
                Tenant tenant = systemContext.getTenantService().findTenantById(isolatedTenantId.get());
                if (tenant != null) {
                    log.debug("[{}] Creating tenant actor", tenant.getId());
                    getOrCreateTenantActor(tenant.getId());
                    log.debug("Tenant actor created.");
                } else {
                    log.error("[{}] Tenant with such ID does not exist", isolatedTenantId.get());
                }
            } else if (systemContext.isTenantComponentsInitEnabled()) {
                //分页查询的方式加载租户信息,每个租户会对应一个TenantActor
                PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
                boolean isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
                boolean isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
                for (Tenant tenant : tenantIterator) {
                    if (isCore || (isRuleEngine && !tenant.isIsolatedTbRuleEngine())) {
                        log.debug("[{}] Creating tenant actor", tenant.getId());
                        getOrCreateTenantActor(tenant.getId());
                        log.debug("[{}] Tenant actor created.", tenant.getId());
                    }
                }
            }
            log.info("Main system actor started.");
        } catch (Exception e) {
            log.warn("Unknown failure", e);
        }
    }

6、初始化TenantActor的时候同时会初始化其对应的RuleChainActor

    @Override
    public void init(TbActorCtx ctx) throws TbActorException {
        super.init(ctx);
        log.info("[{}] Starting tenant actor.", tenantId);
        try {
            Tenant tenant = systemContext.getTenantService().findTenantById(tenantId);
            if (tenant == null) {
                cantFindTenant = true;
                log.info("[{}] Started tenant actor for missing tenant.", tenantId);
            } else {
                // This Service may be started for specific tenant only.
                Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();

                isRuleEngineForCurrentTenant = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
                isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);

                if (isRuleEngineForCurrentTenant) {
                    try {
                        if (isolatedTenantId.map(id -> id.equals(tenantId)).orElseGet(() -> !tenant.isIsolatedTbRuleEngine())) {
                            log.info("[{}] Going to init rule chains", tenantId);
                            initRuleChains();
                        } else {
                            isRuleEngineForCurrentTenant = false;
                        }
                    } catch (Exception e) {
                        cantFindTenant = true;
                    }
                }
                log.info("[{}] Tenant actor started.", tenantId);
            }
        } catch (Exception e) {
            log.warn("[{}] Unknown failure", tenantId, e);
//            TODO: throw this in 3.1?
//            throw new TbActorException("Failed to init actor", e);
        }
    }

    protected void initRuleChains() {
        for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChains(tenantId, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
            RuleChainId ruleChainId = ruleChain.getId();
            log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId());
            TbActorRef actorRef = getOrCreateActor(ruleChainId, id -> ruleChain);
            visit(ruleChain, actorRef);
            log.debug("[{}|{}] Rule Chain actor created.", ruleChainId.getEntityType(), ruleChainId.getId());
        }
    }

7、由于一个Tenant对应多个Rule Chain,并且有一个RootChain,所以TenantActor初始化的时候会将其RootChain赋给rootChain,并将RuleChainActor的Mail代理给到rootChainActor。

8、RuleChainActor初始化的时候同时会初始化其对应的RuleChainActorMessageProcessor,RuleChainActorMessageProcessor会负责消息的具体处理。

9、RuleChainActorMessageProcessor初始化的时候会初始化RuleChain对应的RuleNodeActor,主要通过rule_node 和 relation这两张表中的数据进行排序,每一个RuleNode都会对应一个RuleNodeActor。

    @Override
    public void start(TbActorCtx context) {
        if (!started) {
            RuleChain ruleChain = service.findRuleChainById(tenantId, entityId);
            if (ruleChain != null) {
                List<RuleNode> ruleNodeList = service.getRuleChainNodes(tenantId, entityId);
                log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
                // Creating and starting the actors;
                for (RuleNode ruleNode : ruleNodeList) {
                    log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
                    TbActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
                    //RuleNodeCtx对象包含了self这个对rule chain的Mail引用,所有的rule node可以通过self的方式将数据给到RuleChainActor
                    nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
                }
                initRoutes(ruleChain, ruleNodeList);
                started = true;
            }
        } else {
            onUpdate(context);
        }
    }

    private void initRoutes(RuleChain ruleChain, List<RuleNode> ruleNodeList) {
        nodeRoutes.clear();
        // Populating the routes map;
        for (RuleNode ruleNode : ruleNodeList) {
            List<EntityRelation> relations = service.getRuleNodeRelations(TenantId.SYS_TENANT_ID, ruleNode.getId());
            log.trace("[{}][{}][{}] Processing rule node relations [{}]", tenantId, entityId, ruleNode.getId(), relations.size());
            if (relations.size() == 0) {
                nodeRoutes.put(ruleNode.getId(), Collections.emptyList());
            } else {
                for (EntityRelation relation : relations) {
                    log.trace("[{}][{}][{}] Processing rule node relation [{}]", tenantId, entityId, ruleNode.getId(), relation.getTo());
                    //校验rule node与relation中的是否对应
                    if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
                        RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
                        if (ruleNodeCtx == null) {
                            throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]");
                        }
                    }
                    nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>())
                            .add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));
                }
            }
        }

        firstId = ruleChain.getFirstRuleNodeId();
        firstNode = nodeActors.get(firstId);
        state = ComponentLifecycleState.ACTIVE;
    }

10、RuleNodeActor初始化的时候会创建对应的RuleNodeActorMessageProcessor,RuleNodeActorMessageProcessor初始化的时候会通过反射的方式拿到RuleNode对应的TbNode接口实现类,消息实际会通过这个实现类来进行处理。

    private TbNode initComponent(RuleNode ruleNode) throws Exception {
        TbNode tbNode = null;
        if (ruleNode != null) {
            Class<?> componentClazz = Class.forName(ruleNode.getType());
            tbNode = (TbNode) (componentClazz.newInstance());
            tbNode.init(defaultCtx, new TbNodeConfiguration(ruleNode.getConfiguration()));
        }
        return tbNode;
    }

至此,Actor系统就初始化完成了,整理一下可以得到这样的一张图,左边是流程图,右边是结构图


tb-actor.png

Actor消息处理详解

这里以设备状态上报为例来进行分析

1、org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerService.launchConsumer()方法接收到来自Transport的消息,并将消息丢到Actor系统上下文ActorSystemContext中。

    private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
        TbMsg tbMsg = TbMsg.fromBytes(queueName, toRuleEngineMsg.getTbMsg().toByteArray(), callback);
        QueueToRuleEngineMsg msg;
        ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList();
        Set<String> relationTypes = null;
        if (relationTypesList != null) {
            if (relationTypesList.size() == 1) {
                relationTypes = Collections.singleton(relationTypesList.get(0));
            } else {
                relationTypes = new HashSet<>(relationTypesList);
            }
        }
        msg = new QueueToRuleEngineMsg(tenantId, tbMsg, relationTypes, toRuleEngineMsg.getFailureMessage());
        actorContext.tell(msg);
    }

2、ActorSystemContext把数据发送到根Actor(即AppActor)的邮箱中。

public void tell(TbActorMsg tbActorMsg) {
        appActor.tell(tbActorMsg);
    }

3、AppActor从邮箱中收到消息然后通过process方法进行处理,根据消息类型,会找到对应设备的TenantActor并将消息投递到期邮箱中进行下一步的处理。

    private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
        if (SYSTEM_TENANT.equals(msg.getTenantId())) {
            msg.getTbMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
        } else {
            if (!deletedTenants.contains(msg.getTenantId())) {
                getOrCreateTenantActor(msg.getTenantId()).tell(msg);
            } else {
                msg.getTbMsg().getCallback().onSuccess();
            }
        }
    }

4、TenantActor从邮箱中拿到数据并通过process方法进行处理,他会首先去寻找其root chain actor,并将消息投递到Rule Chain Actor对应的TbActorMailbox邮箱中进行处理。

    private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
        if (!isRuleEngineForCurrentTenant) {
            log.warn("RECEIVED INVALID MESSAGE: {}", msg);
            return;
        }
        TbMsg tbMsg = msg.getTbMsg();
        if (tbMsg.getRuleChainId() == null) {
            if (getRootChainActor() != null) {
                getRootChainActor().tell(msg);
            } else {
                tbMsg.getCallback().onFailure(new RuleEngineException("No Root Rule Chain available!"));
                log.info("[{}] No Root Chain: {}", tenantId, msg);
            }
        } else {
            try {
                ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
            } catch (TbActorNotRegisteredException ex) {
                log.trace("Received message for non-existing rule chain: [{}]", tbMsg.getRuleChainId());
                //TODO: 3.1 Log it to dead letters queue;
                tbMsg.getCallback().onSuccess();
            }
        }
    }

5、RuleChainActor从邮箱中拿到数据之后通过process方法将消息丢给它的RuleChainActorMessageProcessor进行处理。

    @Override
    protected boolean doProcess(TbActorMsg msg) {
        switch (msg.getMsgType()) {
            case COMPONENT_LIFE_CYCLE_MSG:
                onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
                break;
            case QUEUE_TO_RULE_ENGINE_MSG:
                processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
.....

6、RuleChainActorMessageProcessor接收到数据之后,会先确认有没有指定处理的node,如果没有就将其给到这个RuleChain对应的第一个RuleNodeActor进行处理。

   void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) {
        TbMsg msg = envelope.getTbMsg();
        log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, msg.getId(), msg);
        if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) {
            try {
                checkActive(envelope.getTbMsg());
                //从 rule chain对象中获取到第一个rule node的id
                RuleNodeId targetId = msg.getRuleNodeId();
                RuleNodeCtx targetCtx;
                //正常情况下没有指定的话会是个null
                if (targetId == null) {
                    targetCtx = firstNode;
                    msg = msg.copyWithRuleChainId(entityId);
                } else {
                    targetCtx = nodeActors.get(targetId);
                }
                if (targetCtx != null) {
                    log.trace("[{}][{}] Pushing message to target rule node", entityId, targetId);
                    pushMsgToNode(targetCtx, msg, "");
                } else {
                    //当rule node 不存在的时候直接返回成功结果
                    log.trace("[{}][{}] Rule node does not exist. Probably old message", entityId, targetId);
                    msg.getCallback().onSuccess();
                }
            } catch (RuleNodeException rne) {
                envelope.getTbMsg().getCallback().onFailure(rne);
            } catch (Exception e) {
                envelope.getTbMsg().getCallback().onFailure(new RuleEngineException(e.getMessage()));
            }
        } else {
            onTellNext(envelope.getTbMsg(), envelope.getTbMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage());
        }
    }

7、RuleNodeActor从邮箱中收到消息之后会通过process方法,将数据发到RuleNodeActorMessageProcessor进行进一步的处理。

8、RuleNodeActorMessageProcessor会将消息丢给TbNode的实现类进行数据的最后处理。

    void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {
        checkActive(msg.getMsg());
        if (ruleNode.isDebugMode()) {
            systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType());
        }
        try {
            tbNode.onMsg(msg.getCtx(), msg.getMsg());
        } catch (Exception e) {
            msg.getCtx().tellFailure(msg.getMsg(), e);
        }
    }

9、TbNode的实现类处理完之后会调用DefaultTbContext,告知处理结果,然后会根据返回结果重新拼装消息,并将消息传递回RuleChainActor。

    private void tellNext(TbMsg msg, Set<String> relationTypes, Throwable th) {
        if (nodeCtx.getSelf().isDebugMode()) {
            relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));
        }
        nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null));
    }

10、RuleChainActor接收到消息,由于消息类型发生了变更,所有会调用其对应的RuleChainActorMessageProcessor的onTellNext方法进行下一步处理。

    @Override
    protected boolean doProcess(TbActorMsg msg) {
        switch (msg.getMsgType()) {
            case COMPONENT_LIFE_CYCLE_MSG:
                onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
                break;
            case QUEUE_TO_RULE_ENGINE_MSG:
                processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
                break;
            //消息从rule node发往rule chain中的下一个 rule node
            //rule node会通过 DefaultTbContext 获取rule chain中的下一个node,并将消息进行投递
            case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
                processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
                break;
            case RULE_CHAIN_TO_RULE_CHAIN_MSG:
                processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg);
                break;
            case PARTITION_CHANGE_MSG:
                processor.onPartitionChangeMsg((PartitionChangeMsg) msg);
                break;
            case STATS_PERSIST_TICK_MSG:
                onStatsPersistTick(id);
                break;
            default:
                return false;
        }
        return true;
    }

11、RuleChainActorMessageProcessor会根据rule node的关联关系找到下一个处理消息的RuleNodeActor,当没有找到的时候则说明这条数据已经处理完毕。如果可以找到则丢到pushToTarget()这个方法进行处理。

 private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes, String failureMessage) {
        try {
            checkActive(msg);
            EntityId entityId = msg.getOriginator();
            TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId);
            //根据 relationTypes过滤出下一个target
            List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream()
                    .filter(r -> contains(relationTypes, r.getType()))
                    .collect(Collectors.toList());
            int relationsCount = relations.size();
            if (relationsCount == 0) {
                log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());
                if (relationTypes.contains(TbRelationTypes.FAILURE)) {
                    RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId);
                    if (ruleNodeCtx != null) {
                        msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf()));
                    } else {
                        log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId());
                        msg.getCallback().onFailure(new RuleEngineException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]"));
                    }
                } else {
                    msg.getCallback().onSuccess();
                }
            } else if (relationsCount == 1) {
                for (RuleNodeRelation relation : relations) {
                    log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut());
                    pushToTarget(tpi, msg, relation.getOut(), relation.getType());
                }
            } else {
                MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback());
                log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relations);
                for (RuleNodeRelation relation : relations) {
                    EntityId target = relation.getOut();
                    putToQueue(tpi, msg, callbackWrapper, target);
                }
            }
        } catch (RuleNodeException rne) {
            msg.getCallback().onFailure(rne);
        } catch (Exception e) {
            msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage()));
        }
    }

12、服务下一个target的类型是RULE_NODE则将消息发送给它,如果是RULE_CHAIN则重新组装消息,并通知RuleChainActor对应的TenantActor进行处理。

    private void pushToTarget(TopicPartitionInfo tpi, TbMsg msg, EntityId target, String fromRelationType) {
        if (tpi.isMyPartition()) {
            switch (target.getEntityType()) {
                case RULE_NODE:
                    pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg, fromRelationType);
                    break;
                case RULE_CHAIN:
                    parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, fromRelationType));
                    break;
            }
        } else {
            putToQueue(tpi, msg, new TbQueueTbMsgCallbackWrapper(msg.getCallback()), target);
        }
    }

13、TenantActor收到来自RuleChainActor发来的消息之后会寻找下一个RuleChainActor来处理消息。

   private void onRuleChainMsg(RuleChainAwareMsg msg) {
        getOrCreateActor(msg.getRuleChainId()).tell(msg);
    }

至此整个消息的处理流程大致梳理了一下,至于消息在TenantActor、RuleChainActor和RuleNodeActor之间多次的流转就不再赘述了。

有空会补一张流程图再进行梳理一遍。

相关文章

网友评论

      本文标题:Thingsboard源码探索(2)

      本文链接:https://www.haomeiwen.com/subject/fugprktx.html