美文网首页
java用多线程批次查询大量数据(Callable返回数据)方式

java用多线程批次查询大量数据(Callable返回数据)方式

作者: 墨色尘埃 | 来源:发表于2019-05-23 18:02 被阅读0次

    注意事项
    1 不要使用fastjson解析,被爆出有漏洞,使用Jackson
    2 使用jsoup请求url数据时,不要设置maxBodySize过大,因为如果请求次数多的话,被导致OOM
    3 Callable中请求每一页的数据时,不要设置一个成员变量去接收每页返回的结果,而是在call()方法中使用局部变量
    4 在处理List<Future<List<Map<String, Object>>>> futures = executorService.invokeAll(callables);返回结果时,直接在for循环里对每页的数据进行提交数据库操作

    之前的这篇文章有错误,没有将第几页的参数current传到```jsonPage中,导致查询出来的数据都是同一页的【 java处理大数据量的优化

    思路还是一样的,只是在多线程处理的候不一样。比如有3994条数据,每次查询500条,需要8次能查完,就好比翻页效果一样。。利用多线程意思就是,多个线程同时进行查询,每次查一页数据。所以就需要给每一页的查询都放到线程集合中

    image.png
            for (int i = 0; i < times; i++) {
                //new一个线程出来
                Callable<List<Map<String, String>>> callable = new ThreadCallable(date, current, init);
                //将待处理线程放入集合中
                tasks.add(callable);
                current++;
            }
    

    ThreadCallable实现了Callable 接口,需要重写call()方法,我们就在call()方法里进行抓包获取数据。

        @Override
        public List<Map<String, String>> call() throws Exception {
            //分页查询数据库数据
            enterpriseNetData = outletService.requestOutletData(date, bindex, num);
            System.out.println(enterpriseNetData.size() + "/Thread:" + Thread.currentThread().getName());
    //        Thread.sleep(1000);
            return enterpriseNetData;
        }
    

    处理线程返回结果

        /**
         * 方法一
         * 新版安监系统  网点数据
         */
        public void getOutletsNetData(Date date) throws IOException, SessionTimeoutException, InterruptedException,
                ExecutionException, BusinessException {
            //开始时间
            long start = System.currentTimeMillis();
    
            List<Map<String, Object>> countyNetData = countyService.getCountyNetData(new Date(), "320000");
            int size = countyNetData.size();
            int init = 1000;// 每隔100条循环一次
            //开始页数  连接的是orcle的数据库  封装的分页方式  我的是从1开始
            int current = 1;
            //循环多少次
            System.out.println("循环获取数据的次数:" + size);
            //Callable用于产生结果
            List<Callable<List<Map<String, Object>>>> callables = new ArrayList<>();
    
            //一个区县一个线程,线程循环使用。每个区县都从第一页开始查起,所以传入的current是1
            for (int i = 0; i < size; i++) {
                //区县
                String cityCode = (String) countyNetData.get(i).get("DISTRICT_CODE");
                Callable<List<Map<String, Object>>> callable = new ExpRecordOutletCallable(this, date, current, init, cityCode);
                callables.add(callable);
            }
    
            //定义固定长度的线程池  防止线程过多
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            //Future用于获取结果
            List<Future<List<Map<String, Object>>>> futures = executorService.invokeAll(callables);
            logger.info("Future用于获取结果:" + futures.size());
    
            //处理线程返回结果
            if (futures != null && futures.size() > 0) {
                for (Future<List<Map<String, Object>>> future : futures) {
    
                    //每个区县的网点数据
                    List<Map<String, Object>> list = future.get();
                    logger.info("outlet网点数据:" + list.size());
                    List<ExpRecordOutlet> expRecordOutlets = jsonToOutlet(list, date);
                    todayData(expRecordOutlets, date);
    
                }
            }
    
            executorService.shutdown();//关闭线程池
            while (true) {
                // 判断线程池中任务是否全部执行完毕  若执行完毕 再返回 list
                if (executorService.isTerminated()) {
                    break;
                }
            }
            long end = System.currentTimeMillis();
            System.out.println("线程查询数据用时:" + (end - start) + "ms");
        }
    

    java用多线程批次查询大量数据(Callable返回数据)方式
    java使用多线程及分页查询数据量很大的数据

    管局数据抓取

    ExpRecordOutletService
    /**
     * 网点数据
     */
    @Service
    public class ExpRecordOutletService extends ServiceImpl<ExpRecordOutletMapper, ExpRecordOutlet> {
    
        private static final org.slf4j.Logger logger = LoggerFactory.getLogger(ExpRecordOutletService.class);
    
        @Autowired
        ExpRecordCountCountyService countyService;
    
        @Transactional
        public void todayData(List<ExpRecordOutlet> list, Date date) throws BusinessException {
    
            boolean isDelete = delete(new EntityWrapper<ExpRecordOutlet>().eq("count_date", App.removeHMS(date)));
    
            Integer maxId = 1;
            Map map = selectMap(Condition.create().setSqlSelect("max(id) as maxId"));
            if (map != null) {
                maxId = ((Integer) map.get("maxId"));
            }
    
            if (list != null && list.size() > 0) {
                for (ExpRecordOutlet brand : list) {
                    maxId += 1;
                    brand.setId(maxId);
                }
    
                if (isDelete) {
                    insertBatch(list);
                } else {
                    throw new BusinessException("更新失败");
                }
            }
    
        }
    
        @Async
        public void outletsTask(List<Date> dates) {
            logger.info("outlets数据开始采集");
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
    
            for (Date date : dates) {
                try {
                    logger.info(String.format("outlets[%s]数据采集开始", format.format(date)));
                    getOutletsNetData(date);
    //                List<ExpRecordOutlet> expRecordOutlets = jsonToOutlet(outletsNetData, date);
    //                todayData(expRecordOutlets, date);
                    logger.info(String.format("outlets[%s]数据采集结束", format.format(date)));
                } catch (IOException e) {
                    logger.error("outlets采集任务执行失败", e);
                    continue;
                } catch (InterruptedException e) {
                    logger.error("outlets采集任务执行失败", e);
                    continue;
                } catch (ExecutionException e) {
                    logger.error("outlets采集任务执行失败", e);
                    continue;
                } catch (SessionTimeoutException e) {
                    logger.error("outlets采集任务执行失败", e);
                    continue;
                } catch (BusinessException e) {
                    logger.error("outlets数据更新失败", e);
                    continue;
                }
                logger.info("outlets数据结束采集");
            }
        }
    
    
        /**
         * 一页多少条数据
         *
         * @param date
         * @param size
         * @return
         * @throws IOException
         * @throws SessionTimeoutException
         */
        public List<Map<String, Object>> requestOutletData(Date date, int current, int size, String cityCode) throws IOException,
                SessionTimeoutException {
    
            //采集数据的URL
            String url = "http://10.11.100.83:8090/companybusiness/companybusiness!loadBranchList_city.action";
            //条件构造
            Map<String, String> map = new HashMap<>();
            String jsonPage = "{\"condition\":[{\"col\":\"\",\"value\":[\"全部\"]},{\"col\":\"select2\",\"value\":[\"-1\"]," +
                    "\"type\":\"=\"},{\"col\":\"order_pro\",\"value\":[\"YWLZB\"],\"type\":\"=\"}," +
                    "{\"col\":\"order_city\",\"value\":[\"YWLZB\"],\"type\":\"=\"},{\"col\":\"pro_code\"," +
                    "\"value\":[\"320000\"],\"type\":\"=\"},{\"col\":\"city_code\",\"value\":[\"%s\"]}," +
                    "{\"col\":\"order_wd\",\"value\":[\"YWZB\"],\"type\":\"=\"},{\"col\":\"startDate\"," +
                    "\"value\":[\"%s\"],\"type\":\">=\"},{\"col\":\"endDate\",\"value\":[\"%s\"],\"type\":\"<=\"}]," +
                    "\"pageNo\":\"%s\",\"pageSize\":%s,\"pageSizes\":[10,20],\"dir\":\"DESC\"}\n";
            String dayStr = new SimpleDateFormat("yyyy-MM-dd").format(date);
            jsonPage = String.format(jsonPage, cityCode, dayStr, dayStr, current, size);
            map.put("jsonpage", jsonPage);
            map.put("isAjax", "true");
            String result = RequestUtil.requestUrlByFormData(App.cookie, url, map); //获取网页返回的JSON数据
    
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
            //json字符串转为Map对象
            Map cityNetData = objectMapper.readValue(result, Map.class);
    
            Boolean sessiontimeout = (Boolean) cityNetData.get("sessiontimeout");
            if (sessiontimeout == true) {
                throw new SessionTimeoutException();
            }
    
            List outletNetDataArray = (List<Map<String, Object>>) ((Map) cityNetData.get("data")).get("result");
            return outletNetDataArray;
        }
    
        /**
         * 查询返回总数
         */
        public Map getTotal(Date date, int size) throws IOException, SessionTimeoutException {
            //采集数据的URL
            String url = "http://10.11.100.83:8090/companybusiness/companybusiness!loadBranchList_city.action";
            //条件构造
            Map<String, String> map = new HashMap<>();
            String jsonPage = "{\"condition\":[{\"col\":\"\",\"value\":[\"全部\"]},{\"col\":\"select2\",\"value\":[\"-1\"]," +
                    "\"type\":\"=\"},{\"col\":\"order_pro\",\"value\":[\"YWLZB\"],\"type\":\"=\"}," +
                    "{\"col\":\"order_city\",\"value\":[\"YWLZB\"],\"type\":\"=\"},{\"col\":\"pro_code\"," +
                    "\"value\":[\"320000\"],\"type\":\"=\"},{\"col\":\"city_code\",\"value\":[\"320000\"]}," +
                    "{\"col\":\"order_wd\",\"value\":[\"YWZB\"],\"type\":\"=\"},{\"col\":\"startDate\"," +
                    "\"value\":[\"%s\"],\"type\":\">=\"},{\"col\":\"endDate\",\"value\":[\"%s\"],\"type\":\"<=\"}]," +
                    "\"pageNo\":\"1\",\"pageSize\":%s,\"pageSizes\":[10,20],\"dir\":\"DESC\"}\n";
            String dayStr = new SimpleDateFormat("yyyy-MM-dd").format(date);
            jsonPage = String.format(jsonPage, dayStr, dayStr, size);
            map.put("jsonpage", jsonPage);
            map.put("isAjax", "true");
            String result = RequestUtil.requestUrlByFormData(App.cookie, url, map); //获取网页返回的JSON数据
    
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
            //json字符串转为Map对象
            Map cityNetData = objectMapper.readValue(result, Map.class);
    
            return cityNetData;
        }
    
    
        /**
         * 方法一
         * 新版安监系统  网点数据
         */
        public void getOutletsNetData(Date date) throws IOException, SessionTimeoutException, InterruptedException,
                ExecutionException, BusinessException {
            long start = System.currentTimeMillis();//开始时间
    
            List<Map<String, Object>> countyNetData = countyService.getCountyNetData(new Date(), "320000");
            int size = countyNetData.size();
            int init = 1000;// 每隔100条循环一次
            //开始页数  连接的是orcle的数据库  封装的分页方式  我的是从1开始
            int current = 1;
            System.out.println("循环获取数据的次数:" + size);//循环多少次
            //Callable用于产生结果
            List<Callable<List<Map<String, Object>>>> callables = new ArrayList<>();
    
            //一个区县一个线程,线程循环使用。每个区县都从第一页开始查起,所以传入的current是1
            for (int i = 0; i < size; i++) {
    //        for (int i = 0; i < 2; i++) {
                String cityCode = (String) countyNetData.get(i).get("DISTRICT_CODE"); //区县
                //new一个线程出来
                Callable<List<Map<String, Object>>> callable = new ExpRecordOutletCallable(this, date, current, init, cityCode);
    
                callables.add(callable);
    //            current++;
            }
    
            //定义固定长度的线程池  防止线程过多
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            //Future用于获取结果
            List<Future<List<Map<String, Object>>>> futures = executorService.invokeAll(callables);
            logger.info("Future用于获取结果:" + futures.size());
    
            //处理线程返回结果
            if (futures != null && futures.size() > 0) {
                for (Future<List<Map<String, Object>>> future : futures) {
    
                    //每个区县的网点数据
                    List<Map<String, Object>> list = future.get();
                    logger.info("outlet网点数据:" + list.size());
                    List<ExpRecordOutlet> expRecordOutlets = jsonToOutlet(list, date);
                    todayData(expRecordOutlets, date);
    
                }
            }
    
            executorService.shutdown();//关闭线程池
            while (true) {
                // 判断线程池中任务是否全部执行完毕  若执行完毕 再返回 list
                if (executorService.isTerminated()) {
                    break;
                }
            }
            long end = System.currentTimeMillis();
            System.out.println("线程查询数据用时:" + (end - start) + "ms");
        }
    
    //    /**
    //     * 方法一
    //     * 新版安监系统  网点数据
    //     */
    //    public void getOutletsNetData(Date date) throws IOException, SessionTimeoutException, InterruptedException,
    //            ExecutionException, BusinessException {
    //        long start = System.currentTimeMillis();//开始时间
    //
    //        List<Map<String, Object>> countyNetData = countyService.getCountyNetData(new Date(), "320000");
    //        int size = countyNetData.size();
    //        int init = 1000;// 每隔100条循环一次
    //        //开始页数  连接的是orcle的数据库  封装的分页方式  我的是从1开始
    //        int current = 1;
    //        System.out.println("循环获取数据的次数:" + size);//循环多少次
    //        //Callable用于产生结果
    //        List<Callable<List<Map<String, Object>>>> callables = new ArrayList<>();
    //
    //        //一个区县一个线程,线程循环使用。每个区县都从第一页开始查起,所以传入的current是1
    //        for (int i = 0; i < size; i++) {
    ////        for (int i = 0; i < 2; i++) {
    //            //区县
    //            String cityCode = (String) countyNetData.get(i).get("DISTRICT_CODE");
    //
    //            //分页查询数据库数据
    //            List<Map<String, Object>> enterpriseNetData = requestOutletData(date, current, size, cityCode);
    //            for (Map<String, Object> map : enterpriseNetData) {
    //                map.put("province", 320000);
    //                map.put("city", cityCode.substring(0, 4) + "00");
    //                map.put("county", cityCode);
    //            }
    //            System.out.println(enterpriseNetData.size() + "/Thread:" + Thread.currentThread().getName());
    //            Thread.sleep(1000);
    //
    //            List<ExpRecordOutlet> expRecordOutlets = jsonToOutlet(enterpriseNetData, date);
    //            todayData(expRecordOutlets, date);
    //
    //        }
    //
    //        long end = System.currentTimeMillis();
    //        System.out.println("线程查询数据用时:" + (end - start) + "ms");
    //    }
    
    //
    //    /**
    //     * 方法二
    //     *
    //     * @param date
    //     * @return
    //     * @throws IOException
    //     * @throws SessionTimeoutException
    //     * @throws InterruptedException
    //     */
    //    //新版安监系统  网点数据
    //    private JSONArray getOutletsNetDataa(Date date) throws IOException, SessionTimeoutException, InterruptedException {
    //
    //        //先查询一条数据,拿到总条数和页数,是为了多线程使用
    //        JSONObject cityNetData0 = getTotal(date, 1);
    //        int total = (int) ((JSONObject) cityNetData0.get("data")).get("totalCount");
    //        int init = 1000;// 每隔100条循环一次
    //        int cycelTotal = total / init; //循环多少次
    //        if (total % init != 0) {
    //            cycelTotal += 1;
    //            if (total < init) {
    //                init = total;
    //            }
    //        }
    //        System.out.println("循环获取数据的次数:" + cycelTotal);//循环多少次
    //        //开始页数  连接的是orcle的数据库  封装的分页方式  我的是从1开始
    //        int current = 0;
    //
    //        ExecutorService pool = Executors.newFixedThreadPool(5);
    //        JSONArray jsonArray = new JSONArray();
    //        for (int i = 0; i < cycelTotal; i++) {
    //            try {
    //                int finalInit = init;
    //                current++;
    //                int finalCurrent = current;
    //                Callable<JSONArray> run = new Callable<JSONArray>() {
    //                    @Override
    //                    public JSONArray call() throws Exception {
    //                        //这里就是批次查找,每次100条循环获取数据
    //                        JSONArray cityNetData = requestOutletData(date, finalCurrent, finalInit);
    ////                        if (cityNetData.get("sessiontimeout").equals("true")) {
    ////                            throw new SessionTimeoutException();
    ////                        }
    ////                        //对JSON数据预格式化
    ////                        JSONArray outletNetDataArray = (JSONArray) ((JSONObject) cityNetData.get("data")).get
    //// ("result");
    ////                        for (int j = 0; j < outletNetDataArray.size(); j++) {
    ////                            jsonArray.add(outletNetDataArray.get(j));
    ////                        }
    //                        jsonArray.addAll(cityNetData); //JSONArray实现了List接口
    //                        System.out.println(jsonArray.size() + "/Thread:" + Thread.currentThread().getName());
    //                        Thread.sleep(1000);
    //                        return jsonArray;
    //                    }
    //                };
    //
    //                pool.submit(run);
    //            } catch (Exception e) {
    //                logger.info("获取失败,getOutletsNetData:{}", "xxxxx");
    //                e.printStackTrace();
    //            }
    //        }
    //
    //        //任务执行完毕后 关闭线程池 不再接收新任务
    //        pool.shutdown();
    //        while (true) {
    //            // 判断线程池中任务是否全部执行完毕  若执行完毕 再返回 list
    //            if (pool.isTerminated()) {
    //                break;
    //            }
    //        }
    //        return jsonArray;
    //    }
    
    
        public List<ExpRecordOutlet> jsonToOutlet(List<Map<String, Object>> cityNetData, Date date) {
    
            List<ExpRecordOutlet> result = new ArrayList<>();
            for (Map<String, Object> map : cityNetData) {
                String totalNum = ((String) map.get("TOTAL_NUM")).replace(",", "").trim();  //业务量
                String ywzb = ((String) map.get("YWZB")).replace("%", "").trim();//业务量占比
                String pql = ((String) map.get("PQL")).replace(",", "").trim();   //投递量
                String pqzb = ((String) map.get("PQZB")).replace("%", "").trim();  //投递量占比
                String ltzb = ((String) map.get("LTZB")).trim(); //揽投比
                String totalSmNum = ((String) map.get("TOTAL_SM_NUM")).replace(",", "").trim();  //实名量
                String smlv = ((String) map.get("SMLV")).replace("%", "").replace("-", "").trim();  //实名率
                String totalDsNum = ((String) map.get("TOTAL_DS_NUM")).replace(",", "").trim();  //电商量
                String dslv = ((String) map.get("DSLV")).replace("%", "").replace("-", "").trim(); //电商率
    
                String smSj = ((String) map.get("SM_SJ")).replace(",", "").trim(); //散件实名量
                String smSjLv = ((String) map.get("SM_SJ_LV")).replace("%", "").replace("-", "").trim(); //散件实名率
                String smXy = ((String) map.get("SM_XY")).replace(",", "").trim(); //协议实名量
    
                String dszb = ((String) map.get("DSZB")).replace("%", "").trim();
                String branchName = ((String) map.get("BRANCH_NAME")).trim(); //网点名称
                String brandId = ((String) map.get("BRAND_ID")).trim(); //企业名称
    
                String stationId = ((String) map.get("STATION_ID")).trim();
                String province = String.valueOf(map.get("province"));
                String city = ((String) map.get("city"));
                String county = ((String) map.get("county"));
    
                ExpRecordOutlet obj = new ExpRecordOutlet();
    
                obj.setProvince(province);
                obj.setCity(city);
                obj.setCounty(county);
                obj.setBusinessvolume(Integer.parseInt(totalNum));
                obj.setBusinessvolumeper(new BigDecimal(Double.parseDouble(ywzb) / 100));
                obj.setDelivervolume(Integer.parseInt(pql));
                obj.setDelivervolumeper(new BigDecimal(Double.parseDouble(pqzb) / 100));
                obj.setRealvolume(Integer.parseInt(totalSmNum));
                if (smlv != null && !"".equals(smlv)) {
                    obj.setRealrate(new BigDecimal(Double.parseDouble(smlv) / 100));
                } else {
                    obj.setRealrate(new BigDecimal(0));
                }
                obj.setEcomvolume(Integer.parseInt(totalDsNum));
                if (dslv != null && !"".equals(dslv)) {
                    obj.setEcomrate(new BigDecimal(Double.parseDouble(dslv) / 100));
                } else {
                    obj.setRealrate(new BigDecimal(0));
                }
    
                obj.setBrandid(brandId);
                obj.setOutletname(branchName);
                obj.setOutletid(stationId);
    
                if (smXy != null && !"".equals(smXy)) {
                    obj.setPartvolume(Integer.parseInt(smXy));
                } else {
                    obj.setPartvolume(0);
                }
                if (smSj != null && !"".equals(smSj)) {
                    obj.setRealpartvolume(Integer.parseInt(smSj));
                } else {
                    obj.setRealpartvolume(0);
                }
                if (smSjLv != null && !"".equals(smSjLv)) {
                    obj.setRealpartrate(new BigDecimal(Double.parseDouble(smSjLv) / 100));
                } else {
                    obj.setRealpartrate(new BigDecimal(0));
                }
    
                obj.setCreateTime(new Date());
                obj.setCountDate(App.removeHMS(date));
                obj.setCreateUser("robot");
                result.add(obj);
            }
            return result;
        }
    }
    
    ExpRecordOutletCallable
    public class ExpRecordOutletCallable implements Callable<List<Map<String, Object>>> {
    
        private ExpRecordOutletService outletCopyService;  //需要通过构造方法把对应的业务service传进来 实际用的时候把类型变为对应的类型
    
        private String search;//查询条件 根据条件来定义该类的属性
    
        private int current;//当前页数
    
        private int size;//每页查询多少条
    
    //    private List page;//每次分页查出来的数据
    
        //JSONArray实现了List接口
    //    private List<Map<String, Object>> enterpriseNetData;//每次分页查出来的数据
    
        private Date date;
    
        private String cityCode;
    
        public ExpRecordOutletCallable(ExpRecordOutletService outletCopyService, Date date,
                                       int current, int size, String cityCode) throws
                IOException, SessionTimeoutException {
            this.outletCopyService = outletCopyService;
            this.date = date;
            this.current = current;
            this.size = size;
            this.cityCode = cityCode;
        }
    
        @Override
        public List<Map<String, Object>> call() throws Exception {
            //分页查询数据库数据
            List<Map<String, Object>> enterpriseNetData = outletCopyService.requestOutletData(date, current, size, cityCode);
    
    //        //单个线程查询100条数据
    //        enterpriseNetData = new JSONArray();
    //        boolean flag = true;
    //        while (flag) {
    //            JSONArray data = outletCopyService.requestOutletData(date, current, size, cityCode);
    //            if (data.size() != 0 && data.size() < size) {  //请求返回比如95条,size是100条
    //                enterpriseNetData.addAll(data);
    //                flag = false;
    //                break;
    //            }
    //            if (data.size() != 0 && data.size() == size) {
    //                current++;
    //                enterpriseNetData.addAll(data);
    //            }
    //        }
    
    
            for (Map<String, Object> map : enterpriseNetData) {
                map.put("province", 320000);
                map.put("city", cityCode.substring(0, 4) + "00");
                map.put("county", cityCode);
            }
    
            System.out.println(enterpriseNetData.size() + "/Thread:" + Thread.currentThread().getName());
    
            Thread.sleep(1000);
            return enterpriseNetData;
        }
    }
    

    相关文章

      网友评论

          本文标题:java用多线程批次查询大量数据(Callable返回数据)方式

          本文链接:https://www.haomeiwen.com/subject/bgpbzqtx.html