thingsboard大量使用队列,任务都不直接处理,放到队列里消费执行。
ts_kv表的更新任务同样是依赖队列执行。
ts_kv的DB更新逻辑在PsqlInsertTsRepository
:
@SqlTsDao
@PsqlDao
@Repository
@Transactional
public class PsqlInsertTsRepository extends AbstractInsertRepository implements InsertTsRepository<TsKvEntity> {
private static final String INSERT_ON_CONFLICT_DO_UPDATE = "INSERT INTO ts_kv (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) VALUES (?, ?, ?, ?, ?, ?, ?, cast(? AS json)) " +
"ON CONFLICT (entity_id, key, ts) DO UPDATE SET bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json);";
@Override
public void saveOrUpdate(List<TsKvEntity> entities) {
jdbcTemplate.batchUpdate(INSERT_ON_CONFLICT_DO_UPDATE, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
TsKvEntity tsKvEntity = entities.get(i);
ps.setObject(1, tsKvEntity.getEntityId());
ps.setInt(2, tsKvEntity.getKey());
ps.setLong(3, tsKvEntity.getTs());
if (tsKvEntity.getBooleanValue() != null) {
ps.setBoolean(4, tsKvEntity.getBooleanValue());
ps.setBoolean(9, tsKvEntity.getBooleanValue());
} else {
ps.setNull(4, Types.BOOLEAN);
ps.setNull(9, Types.BOOLEAN);
}
ps.setString(5, replaceNullChars(tsKvEntity.getStrValue()));
ps.setString(10, replaceNullChars(tsKvEntity.getStrValue()));
if (tsKvEntity.getLongValue() != null) {
ps.setLong(6, tsKvEntity.getLongValue());
ps.setLong(11, tsKvEntity.getLongValue());
} else {
ps.setNull(6, Types.BIGINT);
ps.setNull(11, Types.BIGINT);
}
if (tsKvEntity.getDoubleValue() != null) {
ps.setDouble(7, tsKvEntity.getDoubleValue());
ps.setDouble(12, tsKvEntity.getDoubleValue());
} else {
ps.setNull(7, Types.DOUBLE);
ps.setNull(12, Types.DOUBLE);
}
ps.setString(8, replaceNullChars(tsKvEntity.getJsonValue()));
ps.setString(13, replaceNullChars(tsKvEntity.getJsonValue()));
}
@Override
public int getBatchSize() {
return entities.size();
}
});
}
}
一、实体入队
提交更新的逻辑在JpaPsqlTimeseriesDao
类的save
方法
@Override
public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
int dataPointDays = getDataPointDays(tsKvEntry, computeTtl(ttl));
savePartitionIfNotExist(tsKvEntry.getTs());
String strKey = tsKvEntry.getKey();
Integer keyId = getOrSaveKeyId(strKey);
TsKvEntity entity = new TsKvEntity();
entity.setEntityId(entityId.getId());
entity.setTs(tsKvEntry.getTs());
entity.setKey(keyId);
entity.setStrValue(tsKvEntry.getStrValue().orElse(null));
entity.setDoubleValue(tsKvEntry.getDoubleValue().orElse(null));
entity.setLongValue(tsKvEntry.getLongValue().orElse(null));
entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null));
entity.setJsonValue(tsKvEntry.getJsonValue().orElse(null));
log.trace("Saving entity: {}", entity);
return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor());
}
在最后tsQueue.add(entity)
将待写入DB的实体放入队列。
tsQueue
的add
方法逻辑:
public ListenableFuture<Void> add(E element) {
int queueIndex = element != null ? (hashCodeFunction.apply(element) & 0x7FFFFFFF) % maxThreads : 0;
return queues.get(queueIndex).add(element);
}
queues是一个队列list。
private final CopyOnWriteArrayList<TbSqlBlockingQueue<E>> queues = new CopyOnWriteArrayList<>();
add方法将传入的实体哈希后,存入list中的一个队列。
二、实体出队,并写入DB
tsQueue定义在父类AbstractChunkedAggregationTimeseriesDao
中。
public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSqlTimeseriesDao implements TimeseriesDao {
protected TbSqlBlockingQueueWrapper<TsKvEntity> tsQueue;
// 省略其余字段
@PostConstruct
protected void init() {
TbSqlBlockingQueueParams tsParams = TbSqlBlockingQueueParams.builder()
.logName("TS")
.batchSize(tsBatchSize)
.maxDelay(tsMaxDelay)
.statsPrintIntervalMs(tsStatsPrintIntervalMs)
.statsNamePrefix("ts")
.batchSortEnabled(batchSortEnabled)
.build();
Function<TsKvEntity, Integer> hashcodeFunction = entity -> entity.getEntityId().hashCode();
tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, tsBatchThreads, statsFactory);
// 第二个参数,把saveOrUpdate方法包装成一个Consumer传入
tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v),
Comparator.comparing((Function<TsKvEntity, UUID>) AbstractTsKvEntity::getEntityId)
.thenComparing(AbstractTsKvEntity::getKey)
.thenComparing(AbstractTsKvEntity::getTs)
);
}
}
由于init()方法加了@PostConstruct注解,这个方法会在当前bean属性注入完成后自动执行。
tsQueue.init
方法:
@Slf4j
@Data
public class TbSqlBlockingQueueWrapper<E> {
private final CopyOnWriteArrayList<TbSqlBlockingQueue<E>> queues = new CopyOnWriteArrayList<>();
private final TbSqlBlockingQueueParams params;
private ScheduledLogExecutorComponent logExecutor;
private final Function<E, Integer> hashCodeFunction;
private final int maxThreads;
private final StatsFactory statsFactory;
/**
* Starts TbSqlBlockingQueues.
*
* @param logExecutor executor that will be printing logs and statistics
* @param saveFunction function to save entities in database
* @param batchUpdateComparator comparator to sort entities by primary key to avoid deadlocks in cluster mode
* NOTE: you must use all of primary key parts in your comparator
*/
public void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction, Comparator<E> batchUpdateComparator) {
for (int i = 0; i < maxThreads; i++) {
MessagesStats stats = statsFactory.createMessagesStats(params.getStatsNamePrefix() + ".queue." + i);
TbSqlBlockingQueue<E> queue = new TbSqlBlockingQueue<>(params, stats);
queues.add(queue);
queue.init(logExecutor, saveFunction, batchUpdateComparator, i);
}
}
public ListenableFuture<Void> add(E element) {
int queueIndex = element != null ? (hashCodeFunction.apply(element) & 0x7FFFFFFF) % maxThreads : 0;
return queues.get(queueIndex).add(element);
}
public void destroy() {
queues.forEach(TbSqlBlockingQueue::destroy);
}
}
TbSqlBlockingQueueWrapper
是TbSqlBlockingQueue
类型的包装类。
里面封装了一个TbSqlBlockingQueue
类型的线程安全的list。
init
方法中会创建maxThreads(可配置)个TbSqlBlockingQueue
,并添加进list。
每创建完一个TbSqlBlockingQueue
,都会调用其init
方法。
public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
private final BlockingQueue<TbSqlQueueElement<E>> queue = new LinkedBlockingQueue<>();
// 省略其他字段
@Override
public void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction, Comparator<E> batchUpdateComparator, int index) {
executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("sql-queue-" + index + "-" + params.getLogName().toLowerCase()));
executor.submit(() -> {
String logName = params.getLogName();
int batchSize = params.getBatchSize();
long maxDelay = params.getMaxDelay();
List<TbSqlQueueElement<E>> entities = new ArrayList<>(batchSize);
while (!Thread.interrupted()) {
try {
long currentTs = System.currentTimeMillis();
TbSqlQueueElement<E> attr = queue.poll(maxDelay, TimeUnit.MILLISECONDS);
if (attr == null) {
continue;
} else {
entities.add(attr);
}
queue.drainTo(entities, batchSize - 1);
boolean fullPack = entities.size() == batchSize;
log.debug("[{}] Going to save {} entities", logName, entities.size());
Stream<E> entitiesStream = entities.stream().map(TbSqlQueueElement::getEntity);
// DB操作真正发生在这
saveFunction.accept(
(params.isBatchSortEnabled() ? entitiesStream.sorted(batchUpdateComparator) : entitiesStream)
.collect(Collectors.toList())
);
entities.forEach(v -> v.getFuture().set(null));
stats.incrementSuccessful(entities.size());
if (!fullPack) {
long remainingDelay = maxDelay - (System.currentTimeMillis() - currentTs);
if (remainingDelay > 0) {
Thread.sleep(remainingDelay);
}
}
} catch (Exception e) {
stats.incrementFailed(entities.size());
entities.forEach(entityFutureWrapper -> entityFutureWrapper.getFuture().setException(e));
if (e instanceof InterruptedException) {
log.info("[{}] Queue polling was interrupted", logName);
break;
} else {
log.error("[{}] Failed to save {} entities", logName, entities.size(), e);
}
} finally {
entities.clear();
}
}
});
logExecutor.scheduleAtFixedRate(() -> {
if (queue.size() > 0 || stats.getTotal() > 0 || stats.getSuccessful() > 0 || stats.getFailed() > 0) {
log.info("Queue-{} [{}] queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]", index,
params.getLogName(), queue.size(), stats.getTotal(), stats.getSuccessful(), stats.getFailed());
stats.reset();
}
}, params.getStatsPrintIntervalMs(), params.getStatsPrintIntervalMs(), TimeUnit.MILLISECONDS);
}
}
这个方法会创建一个单线程的线程池,不断地通过传入的Consumer参数去消费队列里的元素。Consumer的accept方法封装的就是DB操作。
queue的实现使用的是LinkedBlockingQueue
(是一个two lock queue的实现,消费和生产可以并行)。
三、总结
消费者:阻塞队列在项目启动阶段初始化(通过@PostConstruct注解),初始化后,就会有一个单线程线程池循环消费。
生产者:JpaPsqlTimeseriesDao
的save
方法。
网友评论