美文网首页
多线程处理数据,限制并发数量,等待多线程执行结束反馈结果

多线程处理数据,限制并发数量,等待多线程执行结束反馈结果

作者: 承诺一时的华丽 | 来源:发表于2022-12-12 17:28 被阅读0次
     public void pushDataTask() throws InterruptedException {
            logger.debug("===================>> 启动推送数据任务.. <<===================");
            ExecutorService executorService = Executors.newFixedThreadPool(1);
            // 可实现控制线程同时执行的数量 
            Semaphore semaphore = new Semaphore(2);
    
            // 消息推送
            LambdaQueryWrapper<Sys_DataPushConfig> pushConfigWrapper = new LambdaQueryWrapper<>();
            pushConfigWrapper.eq(Sys_DataPushConfig::getState, StateType.启用.type);
            List<Sys_DataPushConfig> pushConfigList = pushConfigService.selectList(pushConfigWrapper);
            if (pushConfigList == null || pushConfigList.isEmpty()) {
                // 无推送应用
                logger.debug("===================>> 无推送应用数量。 <<===================");
                return;
            }
    
            logger.debug("===================>> 推送应用数量:" + pushConfigList.size());
    
            // 推送数据
            LambdaQueryWrapper<Sys_DataPushRecord> dataPushRecordWrapper = new LambdaQueryWrapper<>();
            dataPushRecordWrapper.eq(Sys_DataPushRecord::getProcessStatus, ProcessStatusType.待处理.type);
            List<Sys_DataPushRecord> dataPushRecords = dataPushRecordService.selectList(dataPushRecordWrapper);
    
            logger.debug("===================>> 推送的数据量:" + dataPushRecords.size());
            for (Sys_DataPushRecord record : dataPushRecords) {
                // 可等待pushConfigList执行完后,再执行下一条数据,保证对多个应用数据处理结果的一致性
                CountDownLatch latch = new CountDownLatch(pushConfigList.size());
                for (Sys_DataPushConfig config : pushConfigList) {
                    logger.debug("===================>> 数据处理:" + dataPushRecords.size());
                    Runnable runnable = () -> {
                        try {
                            semaphore.acquire();
                            // 开始处理数据
                            logger.debug(Thread.currentThread().getName() + "------------------" + semaphore.availablePermits() + "\t" + "开始处理数据:config=>" + config.getAppName() + ",record:" + JSONUtil.toJsonStr(record));
                            Thread.sleep(1000);
                            semaphore.release();
                            latch.countDown();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    };
                    executorService.execute(runnable);
    
                }
                latch.await();
                // 更新该条数据的处理最终结果
                // ...
    
            }
            executorService.shutdown();
            logger.debug("===================>> 结束推送数据任务。 <<===================");
        }
    

    相关文章

      网友评论

          本文标题:多线程处理数据,限制并发数量,等待多线程执行结束反馈结果

          本文链接:https://www.haomeiwen.com/subject/piwfqdtx.html