美文网首页
ThingsBoard源码分析:ts_kv表的更新逻辑

ThingsBoard源码分析:ts_kv表的更新逻辑

作者: M_lear | 来源:发表于2022-07-04 15:18 被阅读0次

    thingsboard大量使用队列,任务都不直接处理,放到队列里消费执行。

    ts_kv表的更新任务同样是依赖队列执行。

    ts_kv的DB更新逻辑在PsqlInsertTsRepository

    @SqlTsDao
    @PsqlDao
    @Repository
    @Transactional
    public class PsqlInsertTsRepository extends AbstractInsertRepository implements InsertTsRepository<TsKvEntity> {
    
        private static final String INSERT_ON_CONFLICT_DO_UPDATE = "INSERT INTO ts_kv (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) VALUES (?, ?, ?, ?, ?, ?, ?, cast(? AS json)) " +
                "ON CONFLICT (entity_id, key, ts) DO UPDATE SET bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json);";
    
        @Override
        public void saveOrUpdate(List<TsKvEntity> entities) {
            jdbcTemplate.batchUpdate(INSERT_ON_CONFLICT_DO_UPDATE, new BatchPreparedStatementSetter() {
                @Override
                public void setValues(PreparedStatement ps, int i) throws SQLException {
                    TsKvEntity tsKvEntity = entities.get(i);
                    ps.setObject(1, tsKvEntity.getEntityId());
                    ps.setInt(2, tsKvEntity.getKey());
                    ps.setLong(3, tsKvEntity.getTs());
    
                    if (tsKvEntity.getBooleanValue() != null) {
                        ps.setBoolean(4, tsKvEntity.getBooleanValue());
                        ps.setBoolean(9, tsKvEntity.getBooleanValue());
                    } else {
                        ps.setNull(4, Types.BOOLEAN);
                        ps.setNull(9, Types.BOOLEAN);
                    }
    
                    ps.setString(5, replaceNullChars(tsKvEntity.getStrValue()));
                    ps.setString(10, replaceNullChars(tsKvEntity.getStrValue()));
    
    
                    if (tsKvEntity.getLongValue() != null) {
                        ps.setLong(6, tsKvEntity.getLongValue());
                        ps.setLong(11, tsKvEntity.getLongValue());
                    } else {
                        ps.setNull(6, Types.BIGINT);
                        ps.setNull(11, Types.BIGINT);
                    }
    
                    if (tsKvEntity.getDoubleValue() != null) {
                        ps.setDouble(7, tsKvEntity.getDoubleValue());
                        ps.setDouble(12, tsKvEntity.getDoubleValue());
                    } else {
                        ps.setNull(7, Types.DOUBLE);
                        ps.setNull(12, Types.DOUBLE);
                    }
    
                    ps.setString(8, replaceNullChars(tsKvEntity.getJsonValue()));
                    ps.setString(13, replaceNullChars(tsKvEntity.getJsonValue()));
                }
    
                @Override
                public int getBatchSize() {
                    return entities.size();
                }
            });
        }
    
    }
    

    一、实体入队

    提交更新的逻辑在JpaPsqlTimeseriesDao类的save方法

        @Override
        public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
            int dataPointDays = getDataPointDays(tsKvEntry, computeTtl(ttl));
            savePartitionIfNotExist(tsKvEntry.getTs());
            String strKey = tsKvEntry.getKey();
            Integer keyId = getOrSaveKeyId(strKey);
            TsKvEntity entity = new TsKvEntity();
            entity.setEntityId(entityId.getId());
            entity.setTs(tsKvEntry.getTs());
            entity.setKey(keyId);
            entity.setStrValue(tsKvEntry.getStrValue().orElse(null));
            entity.setDoubleValue(tsKvEntry.getDoubleValue().orElse(null));
            entity.setLongValue(tsKvEntry.getLongValue().orElse(null));
            entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null));
            entity.setJsonValue(tsKvEntry.getJsonValue().orElse(null));
            log.trace("Saving entity: {}", entity);
            return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor());
        }
    

    在最后tsQueue.add(entity)将待写入DB的实体放入队列。

    tsQueueadd方法逻辑:

        public ListenableFuture<Void> add(E element) {
            int queueIndex = element != null ? (hashCodeFunction.apply(element) & 0x7FFFFFFF) % maxThreads : 0;
            return queues.get(queueIndex).add(element);
        }
    

    queues是一个队列list。

    private final CopyOnWriteArrayList<TbSqlBlockingQueue<E>> queues = new CopyOnWriteArrayList<>();
    

    add方法将传入的实体哈希后,存入list中的一个队列。

    二、实体出队,并写入DB

    tsQueue定义在父类AbstractChunkedAggregationTimeseriesDao中。

    public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSqlTimeseriesDao implements TimeseriesDao {
    
        protected TbSqlBlockingQueueWrapper<TsKvEntity> tsQueue;
        // 省略其余字段
    
        @PostConstruct
        protected void init() {
            TbSqlBlockingQueueParams tsParams = TbSqlBlockingQueueParams.builder()
                    .logName("TS")
                    .batchSize(tsBatchSize)
                    .maxDelay(tsMaxDelay)
                    .statsPrintIntervalMs(tsStatsPrintIntervalMs)
                    .statsNamePrefix("ts")
                    .batchSortEnabled(batchSortEnabled)
                    .build();
    
            Function<TsKvEntity, Integer> hashcodeFunction = entity -> entity.getEntityId().hashCode();
            tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, tsBatchThreads, statsFactory);
            // 第二个参数,把saveOrUpdate方法包装成一个Consumer传入
            tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v),
                    Comparator.comparing((Function<TsKvEntity, UUID>) AbstractTsKvEntity::getEntityId)
                            .thenComparing(AbstractTsKvEntity::getKey)
                            .thenComparing(AbstractTsKvEntity::getTs)
                    );
        }
    
    }
    

    由于init()方法加了@PostConstruct注解,这个方法会在当前bean属性注入完成后自动执行。

    tsQueue.init方法:

    @Slf4j
    @Data
    public class TbSqlBlockingQueueWrapper<E> {
        private final CopyOnWriteArrayList<TbSqlBlockingQueue<E>> queues = new CopyOnWriteArrayList<>();
        private final TbSqlBlockingQueueParams params;
        private ScheduledLogExecutorComponent logExecutor;
        private final Function<E, Integer> hashCodeFunction;
        private final int maxThreads;
        private final StatsFactory statsFactory;
    
        /**
         * Starts TbSqlBlockingQueues.
         *
         * @param  logExecutor  executor that will be printing logs and statistics
         * @param  saveFunction function to save entities in database
         * @param  batchUpdateComparator comparator to sort entities by primary key to avoid deadlocks in cluster mode
         *                               NOTE: you must use all of primary key parts in your comparator
         */
        public void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction, Comparator<E> batchUpdateComparator) {
            for (int i = 0; i < maxThreads; i++) {
                MessagesStats stats = statsFactory.createMessagesStats(params.getStatsNamePrefix() + ".queue." + i);
                TbSqlBlockingQueue<E> queue = new TbSqlBlockingQueue<>(params, stats);
                queues.add(queue);
                queue.init(logExecutor, saveFunction, batchUpdateComparator, i);
            }
        }
    
        public ListenableFuture<Void> add(E element) {
            int queueIndex = element != null ? (hashCodeFunction.apply(element) & 0x7FFFFFFF) % maxThreads : 0;
            return queues.get(queueIndex).add(element);
        }
    
        public void destroy() {
            queues.forEach(TbSqlBlockingQueue::destroy);
        }
    }
    
    

    TbSqlBlockingQueueWrapperTbSqlBlockingQueue类型的包装类。

    里面封装了一个TbSqlBlockingQueue类型的线程安全的list。
    init方法中会创建maxThreads(可配置)个TbSqlBlockingQueue,并添加进list。

    每创建完一个TbSqlBlockingQueue,都会调用其init方法。

    public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
        private final BlockingQueue<TbSqlQueueElement<E>> queue = new LinkedBlockingQueue<>();
        // 省略其他字段
    
        @Override
        public void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction, Comparator<E> batchUpdateComparator, int index) {
            executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("sql-queue-" + index + "-" + params.getLogName().toLowerCase()));
            executor.submit(() -> {
                String logName = params.getLogName();
                int batchSize = params.getBatchSize();
                long maxDelay = params.getMaxDelay();
                List<TbSqlQueueElement<E>> entities = new ArrayList<>(batchSize);
                while (!Thread.interrupted()) {
                    try {
                        long currentTs = System.currentTimeMillis();
                        TbSqlQueueElement<E> attr = queue.poll(maxDelay, TimeUnit.MILLISECONDS);
                        if (attr == null) {
                            continue;
                        } else {
                            entities.add(attr);
                        }
                        queue.drainTo(entities, batchSize - 1);
                        boolean fullPack = entities.size() == batchSize;
                        log.debug("[{}] Going to save {} entities", logName, entities.size());
                        Stream<E> entitiesStream = entities.stream().map(TbSqlQueueElement::getEntity);
                        // DB操作真正发生在这
                        saveFunction.accept(
                                (params.isBatchSortEnabled() ? entitiesStream.sorted(batchUpdateComparator) : entitiesStream)
                                        .collect(Collectors.toList())
                        );
                        entities.forEach(v -> v.getFuture().set(null));
                        stats.incrementSuccessful(entities.size());
                        if (!fullPack) {
                            long remainingDelay = maxDelay - (System.currentTimeMillis() - currentTs);
                            if (remainingDelay > 0) {
                                Thread.sleep(remainingDelay);
                            }
                        }
                    } catch (Exception e) {
                        stats.incrementFailed(entities.size());
                        entities.forEach(entityFutureWrapper -> entityFutureWrapper.getFuture().setException(e));
                        if (e instanceof InterruptedException) {
                            log.info("[{}] Queue polling was interrupted", logName);
                            break;
                        } else {
                            log.error("[{}] Failed to save {} entities", logName, entities.size(), e);
                        }
                    } finally {
                        entities.clear();
                    }
                }
            });
    
            logExecutor.scheduleAtFixedRate(() -> {
                if (queue.size() > 0 || stats.getTotal() > 0 || stats.getSuccessful() > 0 || stats.getFailed() > 0) {
                    log.info("Queue-{} [{}] queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]", index,
                            params.getLogName(), queue.size(), stats.getTotal(), stats.getSuccessful(), stats.getFailed());
                    stats.reset();
                }
            }, params.getStatsPrintIntervalMs(), params.getStatsPrintIntervalMs(), TimeUnit.MILLISECONDS);
        }
    }
    

    这个方法会创建一个单线程的线程池,不断地通过传入的Consumer参数去消费队列里的元素。Consumer的accept方法封装的就是DB操作。

    queue的实现使用的是LinkedBlockingQueue(是一个two lock queue的实现,消费和生产可以并行)。

    三、总结

    消费者:阻塞队列在项目启动阶段初始化(通过@PostConstruct注解),初始化后,就会有一个单线程线程池循环消费。
    生产者:JpaPsqlTimeseriesDaosave方法。

    相关文章

      网友评论

          本文标题:ThingsBoard源码分析:ts_kv表的更新逻辑

          本文链接:https://www.haomeiwen.com/subject/dqsdbrtx.html