public void pushDataTask() throws InterruptedException {
logger.debug("===================>> 启动推送数据任务.. <<===================");
ExecutorService executorService = Executors.newFixedThreadPool(1);
// 可实现控制线程同时执行的数量
Semaphore semaphore = new Semaphore(2);
// 消息推送
LambdaQueryWrapper<Sys_DataPushConfig> pushConfigWrapper = new LambdaQueryWrapper<>();
pushConfigWrapper.eq(Sys_DataPushConfig::getState, StateType.启用.type);
List<Sys_DataPushConfig> pushConfigList = pushConfigService.selectList(pushConfigWrapper);
if (pushConfigList == null || pushConfigList.isEmpty()) {
// 无推送应用
logger.debug("===================>> 无推送应用数量。 <<===================");
return;
}
logger.debug("===================>> 推送应用数量:" + pushConfigList.size());
// 推送数据
LambdaQueryWrapper<Sys_DataPushRecord> dataPushRecordWrapper = new LambdaQueryWrapper<>();
dataPushRecordWrapper.eq(Sys_DataPushRecord::getProcessStatus, ProcessStatusType.待处理.type);
List<Sys_DataPushRecord> dataPushRecords = dataPushRecordService.selectList(dataPushRecordWrapper);
logger.debug("===================>> 推送的数据量:" + dataPushRecords.size());
for (Sys_DataPushRecord record : dataPushRecords) {
// 可等待pushConfigList执行完后,再执行下一条数据,保证对多个应用数据处理结果的一致性
CountDownLatch latch = new CountDownLatch(pushConfigList.size());
for (Sys_DataPushConfig config : pushConfigList) {
logger.debug("===================>> 数据处理:" + dataPushRecords.size());
Runnable runnable = () -> {
try {
semaphore.acquire();
// 开始处理数据
logger.debug(Thread.currentThread().getName() + "------------------" + semaphore.availablePermits() + "\t" + "开始处理数据:config=>" + config.getAppName() + ",record:" + JSONUtil.toJsonStr(record));
Thread.sleep(1000);
semaphore.release();
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
executorService.execute(runnable);
}
latch.await();
// 更新该条数据的处理最终结果
// ...
}
executorService.shutdown();
logger.debug("===================>> 结束推送数据任务。 <<===================");
}
网友评论