因为可以使用多种消息队列,所以对生产消费以及主题做了抽象。
TopicPartitionInfo类,就是抽象的主题类。
有5个字段:
private final String topic;
private final TenantId tenantId;
private final Integer partition;
@Getter
private final String fullTopicName;
@Getter
private final boolean myPartition;
- topic字段
有配置的,例如:
tb_core
tb_rule_engine.main
tb_rule_engine.hp
tb_rule_engine.sq
有拼接的,例如通知类的主题:
serviceType.name().toLowerCase() + ".notifications." + serviceId
拼接后就是这个样子:
tb_transport.notifications.Win102021VRAHVU - tenantId字段
使用单独服务的租户,这个字段才不为空。 - partition字段
实体id(可能是设备id、租户id)哈希取余后的分区。
通知类的主题这个字段为空。 - fullTopicName字段
这个类最重要的字段,表示完整的主题名,发消息时就往这个主题发。
它是topic+tenantId(非空)+partition(非空),三个字段的拼接(如果tenantId、partition都非空)。
例如:
tb_core.2(租户公用)
tb_core.3dba9880-3971-11ed-aeb4-c31e798a06ec.5(单独给3dba9880-3971-11ed-aeb4-c31e798a06ec这个租户使用) - myPartition字段
对于partition字段不为空的主题,如果这个主题归当前服务消费,那这个字段就为true,否则为false。
服务怎么判断自己消费哪些分区呢?
那就需要通过zookeeper知道,跟当前服务同种类型的服务有几个。例如tb_core服务,假如zookeeper中注册了3个tb_core服务,而tb_core主题的分区有10个,那么第一个服务消费0、3、6、9四个分区,第二个服务消费1、4、7、10四个分区,第三个服务消费2、5、8三个分区。
其实就是按partitionIdx % servers.size()取余计算,落到哪个服务就属于哪个服务消费。
当同类型服务数量发生变化时,thingsboard会借助spring的发布订阅机制,将本服务消费分区变化的信息通知给consumer,然后consumer重新订阅,并拉取对应主题的消息来消费。
网友评论