美文网首页
java用多线程批次查询大量数据(Callable返回数据)方式

java用多线程批次查询大量数据(Callable返回数据)方式

作者: 墨色尘埃 | 来源:发表于2019-05-23 18:02 被阅读0次

注意事项
1 不要使用fastjson解析,被爆出有漏洞,使用Jackson
2 使用jsoup请求url数据时,不要设置maxBodySize过大,因为如果请求次数多的话,被导致OOM
3 Callable中请求每一页的数据时,不要设置一个成员变量去接收每页返回的结果,而是在call()方法中使用局部变量
4 在处理List<Future<List<Map<String, Object>>>> futures = executorService.invokeAll(callables);返回结果时,直接在for循环里对每页的数据进行提交数据库操作

之前的这篇文章有错误,没有将第几页的参数current传到```jsonPage中,导致查询出来的数据都是同一页的【 java处理大数据量的优化

思路还是一样的,只是在多线程处理的候不一样。比如有3994条数据,每次查询500条,需要8次能查完,就好比翻页效果一样。。利用多线程意思就是,多个线程同时进行查询,每次查一页数据。所以就需要给每一页的查询都放到线程集合中

image.png
        for (int i = 0; i < times; i++) {
            //new一个线程出来
            Callable<List<Map<String, String>>> callable = new ThreadCallable(date, current, init);
            //将待处理线程放入集合中
            tasks.add(callable);
            current++;
        }

ThreadCallable实现了Callable 接口,需要重写call()方法,我们就在call()方法里进行抓包获取数据。

    @Override
    public List<Map<String, String>> call() throws Exception {
        //分页查询数据库数据
        enterpriseNetData = outletService.requestOutletData(date, bindex, num);
        System.out.println(enterpriseNetData.size() + "/Thread:" + Thread.currentThread().getName());
//        Thread.sleep(1000);
        return enterpriseNetData;
    }

处理线程返回结果

    /**
     * 方法一
     * 新版安监系统  网点数据
     */
    public void getOutletsNetData(Date date) throws IOException, SessionTimeoutException, InterruptedException,
            ExecutionException, BusinessException {
        //开始时间
        long start = System.currentTimeMillis();

        List<Map<String, Object>> countyNetData = countyService.getCountyNetData(new Date(), "320000");
        int size = countyNetData.size();
        int init = 1000;// 每隔100条循环一次
        //开始页数  连接的是orcle的数据库  封装的分页方式  我的是从1开始
        int current = 1;
        //循环多少次
        System.out.println("循环获取数据的次数:" + size);
        //Callable用于产生结果
        List<Callable<List<Map<String, Object>>>> callables = new ArrayList<>();

        //一个区县一个线程,线程循环使用。每个区县都从第一页开始查起,所以传入的current是1
        for (int i = 0; i < size; i++) {
            //区县
            String cityCode = (String) countyNetData.get(i).get("DISTRICT_CODE");
            Callable<List<Map<String, Object>>> callable = new ExpRecordOutletCallable(this, date, current, init, cityCode);
            callables.add(callable);
        }

        //定义固定长度的线程池  防止线程过多
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //Future用于获取结果
        List<Future<List<Map<String, Object>>>> futures = executorService.invokeAll(callables);
        logger.info("Future用于获取结果:" + futures.size());

        //处理线程返回结果
        if (futures != null && futures.size() > 0) {
            for (Future<List<Map<String, Object>>> future : futures) {

                //每个区县的网点数据
                List<Map<String, Object>> list = future.get();
                logger.info("outlet网点数据:" + list.size());
                List<ExpRecordOutlet> expRecordOutlets = jsonToOutlet(list, date);
                todayData(expRecordOutlets, date);

            }
        }

        executorService.shutdown();//关闭线程池
        while (true) {
            // 判断线程池中任务是否全部执行完毕  若执行完毕 再返回 list
            if (executorService.isTerminated()) {
                break;
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("线程查询数据用时:" + (end - start) + "ms");
    }

java用多线程批次查询大量数据(Callable返回数据)方式
java使用多线程及分页查询数据量很大的数据

管局数据抓取

ExpRecordOutletService
/**
 * 网点数据
 */
@Service
public class ExpRecordOutletService extends ServiceImpl<ExpRecordOutletMapper, ExpRecordOutlet> {

    private static final org.slf4j.Logger logger = LoggerFactory.getLogger(ExpRecordOutletService.class);

    @Autowired
    ExpRecordCountCountyService countyService;

    @Transactional
    public void todayData(List<ExpRecordOutlet> list, Date date) throws BusinessException {

        boolean isDelete = delete(new EntityWrapper<ExpRecordOutlet>().eq("count_date", App.removeHMS(date)));

        Integer maxId = 1;
        Map map = selectMap(Condition.create().setSqlSelect("max(id) as maxId"));
        if (map != null) {
            maxId = ((Integer) map.get("maxId"));
        }

        if (list != null && list.size() > 0) {
            for (ExpRecordOutlet brand : list) {
                maxId += 1;
                brand.setId(maxId);
            }

            if (isDelete) {
                insertBatch(list);
            } else {
                throw new BusinessException("更新失败");
            }
        }

    }

    @Async
    public void outletsTask(List<Date> dates) {
        logger.info("outlets数据开始采集");
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");

        for (Date date : dates) {
            try {
                logger.info(String.format("outlets[%s]数据采集开始", format.format(date)));
                getOutletsNetData(date);
//                List<ExpRecordOutlet> expRecordOutlets = jsonToOutlet(outletsNetData, date);
//                todayData(expRecordOutlets, date);
                logger.info(String.format("outlets[%s]数据采集结束", format.format(date)));
            } catch (IOException e) {
                logger.error("outlets采集任务执行失败", e);
                continue;
            } catch (InterruptedException e) {
                logger.error("outlets采集任务执行失败", e);
                continue;
            } catch (ExecutionException e) {
                logger.error("outlets采集任务执行失败", e);
                continue;
            } catch (SessionTimeoutException e) {
                logger.error("outlets采集任务执行失败", e);
                continue;
            } catch (BusinessException e) {
                logger.error("outlets数据更新失败", e);
                continue;
            }
            logger.info("outlets数据结束采集");
        }
    }


    /**
     * 一页多少条数据
     *
     * @param date
     * @param size
     * @return
     * @throws IOException
     * @throws SessionTimeoutException
     */
    public List<Map<String, Object>> requestOutletData(Date date, int current, int size, String cityCode) throws IOException,
            SessionTimeoutException {

        //采集数据的URL
        String url = "http://10.11.100.83:8090/companybusiness/companybusiness!loadBranchList_city.action";
        //条件构造
        Map<String, String> map = new HashMap<>();
        String jsonPage = "{\"condition\":[{\"col\":\"\",\"value\":[\"全部\"]},{\"col\":\"select2\",\"value\":[\"-1\"]," +
                "\"type\":\"=\"},{\"col\":\"order_pro\",\"value\":[\"YWLZB\"],\"type\":\"=\"}," +
                "{\"col\":\"order_city\",\"value\":[\"YWLZB\"],\"type\":\"=\"},{\"col\":\"pro_code\"," +
                "\"value\":[\"320000\"],\"type\":\"=\"},{\"col\":\"city_code\",\"value\":[\"%s\"]}," +
                "{\"col\":\"order_wd\",\"value\":[\"YWZB\"],\"type\":\"=\"},{\"col\":\"startDate\"," +
                "\"value\":[\"%s\"],\"type\":\">=\"},{\"col\":\"endDate\",\"value\":[\"%s\"],\"type\":\"<=\"}]," +
                "\"pageNo\":\"%s\",\"pageSize\":%s,\"pageSizes\":[10,20],\"dir\":\"DESC\"}\n";
        String dayStr = new SimpleDateFormat("yyyy-MM-dd").format(date);
        jsonPage = String.format(jsonPage, cityCode, dayStr, dayStr, current, size);
        map.put("jsonpage", jsonPage);
        map.put("isAjax", "true");
        String result = RequestUtil.requestUrlByFormData(App.cookie, url, map); //获取网页返回的JSON数据

        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        //json字符串转为Map对象
        Map cityNetData = objectMapper.readValue(result, Map.class);

        Boolean sessiontimeout = (Boolean) cityNetData.get("sessiontimeout");
        if (sessiontimeout == true) {
            throw new SessionTimeoutException();
        }

        List outletNetDataArray = (List<Map<String, Object>>) ((Map) cityNetData.get("data")).get("result");
        return outletNetDataArray;
    }

    /**
     * 查询返回总数
     */
    public Map getTotal(Date date, int size) throws IOException, SessionTimeoutException {
        //采集数据的URL
        String url = "http://10.11.100.83:8090/companybusiness/companybusiness!loadBranchList_city.action";
        //条件构造
        Map<String, String> map = new HashMap<>();
        String jsonPage = "{\"condition\":[{\"col\":\"\",\"value\":[\"全部\"]},{\"col\":\"select2\",\"value\":[\"-1\"]," +
                "\"type\":\"=\"},{\"col\":\"order_pro\",\"value\":[\"YWLZB\"],\"type\":\"=\"}," +
                "{\"col\":\"order_city\",\"value\":[\"YWLZB\"],\"type\":\"=\"},{\"col\":\"pro_code\"," +
                "\"value\":[\"320000\"],\"type\":\"=\"},{\"col\":\"city_code\",\"value\":[\"320000\"]}," +
                "{\"col\":\"order_wd\",\"value\":[\"YWZB\"],\"type\":\"=\"},{\"col\":\"startDate\"," +
                "\"value\":[\"%s\"],\"type\":\">=\"},{\"col\":\"endDate\",\"value\":[\"%s\"],\"type\":\"<=\"}]," +
                "\"pageNo\":\"1\",\"pageSize\":%s,\"pageSizes\":[10,20],\"dir\":\"DESC\"}\n";
        String dayStr = new SimpleDateFormat("yyyy-MM-dd").format(date);
        jsonPage = String.format(jsonPage, dayStr, dayStr, size);
        map.put("jsonpage", jsonPage);
        map.put("isAjax", "true");
        String result = RequestUtil.requestUrlByFormData(App.cookie, url, map); //获取网页返回的JSON数据

        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        //json字符串转为Map对象
        Map cityNetData = objectMapper.readValue(result, Map.class);

        return cityNetData;
    }


    /**
     * 方法一
     * 新版安监系统  网点数据
     */
    public void getOutletsNetData(Date date) throws IOException, SessionTimeoutException, InterruptedException,
            ExecutionException, BusinessException {
        long start = System.currentTimeMillis();//开始时间

        List<Map<String, Object>> countyNetData = countyService.getCountyNetData(new Date(), "320000");
        int size = countyNetData.size();
        int init = 1000;// 每隔100条循环一次
        //开始页数  连接的是orcle的数据库  封装的分页方式  我的是从1开始
        int current = 1;
        System.out.println("循环获取数据的次数:" + size);//循环多少次
        //Callable用于产生结果
        List<Callable<List<Map<String, Object>>>> callables = new ArrayList<>();

        //一个区县一个线程,线程循环使用。每个区县都从第一页开始查起,所以传入的current是1
        for (int i = 0; i < size; i++) {
//        for (int i = 0; i < 2; i++) {
            String cityCode = (String) countyNetData.get(i).get("DISTRICT_CODE"); //区县
            //new一个线程出来
            Callable<List<Map<String, Object>>> callable = new ExpRecordOutletCallable(this, date, current, init, cityCode);

            callables.add(callable);
//            current++;
        }

        //定义固定长度的线程池  防止线程过多
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //Future用于获取结果
        List<Future<List<Map<String, Object>>>> futures = executorService.invokeAll(callables);
        logger.info("Future用于获取结果:" + futures.size());

        //处理线程返回结果
        if (futures != null && futures.size() > 0) {
            for (Future<List<Map<String, Object>>> future : futures) {

                //每个区县的网点数据
                List<Map<String, Object>> list = future.get();
                logger.info("outlet网点数据:" + list.size());
                List<ExpRecordOutlet> expRecordOutlets = jsonToOutlet(list, date);
                todayData(expRecordOutlets, date);

            }
        }

        executorService.shutdown();//关闭线程池
        while (true) {
            // 判断线程池中任务是否全部执行完毕  若执行完毕 再返回 list
            if (executorService.isTerminated()) {
                break;
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("线程查询数据用时:" + (end - start) + "ms");
    }

//    /**
//     * 方法一
//     * 新版安监系统  网点数据
//     */
//    public void getOutletsNetData(Date date) throws IOException, SessionTimeoutException, InterruptedException,
//            ExecutionException, BusinessException {
//        long start = System.currentTimeMillis();//开始时间
//
//        List<Map<String, Object>> countyNetData = countyService.getCountyNetData(new Date(), "320000");
//        int size = countyNetData.size();
//        int init = 1000;// 每隔100条循环一次
//        //开始页数  连接的是orcle的数据库  封装的分页方式  我的是从1开始
//        int current = 1;
//        System.out.println("循环获取数据的次数:" + size);//循环多少次
//        //Callable用于产生结果
//        List<Callable<List<Map<String, Object>>>> callables = new ArrayList<>();
//
//        //一个区县一个线程,线程循环使用。每个区县都从第一页开始查起,所以传入的current是1
//        for (int i = 0; i < size; i++) {
////        for (int i = 0; i < 2; i++) {
//            //区县
//            String cityCode = (String) countyNetData.get(i).get("DISTRICT_CODE");
//
//            //分页查询数据库数据
//            List<Map<String, Object>> enterpriseNetData = requestOutletData(date, current, size, cityCode);
//            for (Map<String, Object> map : enterpriseNetData) {
//                map.put("province", 320000);
//                map.put("city", cityCode.substring(0, 4) + "00");
//                map.put("county", cityCode);
//            }
//            System.out.println(enterpriseNetData.size() + "/Thread:" + Thread.currentThread().getName());
//            Thread.sleep(1000);
//
//            List<ExpRecordOutlet> expRecordOutlets = jsonToOutlet(enterpriseNetData, date);
//            todayData(expRecordOutlets, date);
//
//        }
//
//        long end = System.currentTimeMillis();
//        System.out.println("线程查询数据用时:" + (end - start) + "ms");
//    }

//
//    /**
//     * 方法二
//     *
//     * @param date
//     * @return
//     * @throws IOException
//     * @throws SessionTimeoutException
//     * @throws InterruptedException
//     */
//    //新版安监系统  网点数据
//    private JSONArray getOutletsNetDataa(Date date) throws IOException, SessionTimeoutException, InterruptedException {
//
//        //先查询一条数据,拿到总条数和页数,是为了多线程使用
//        JSONObject cityNetData0 = getTotal(date, 1);
//        int total = (int) ((JSONObject) cityNetData0.get("data")).get("totalCount");
//        int init = 1000;// 每隔100条循环一次
//        int cycelTotal = total / init; //循环多少次
//        if (total % init != 0) {
//            cycelTotal += 1;
//            if (total < init) {
//                init = total;
//            }
//        }
//        System.out.println("循环获取数据的次数:" + cycelTotal);//循环多少次
//        //开始页数  连接的是orcle的数据库  封装的分页方式  我的是从1开始
//        int current = 0;
//
//        ExecutorService pool = Executors.newFixedThreadPool(5);
//        JSONArray jsonArray = new JSONArray();
//        for (int i = 0; i < cycelTotal; i++) {
//            try {
//                int finalInit = init;
//                current++;
//                int finalCurrent = current;
//                Callable<JSONArray> run = new Callable<JSONArray>() {
//                    @Override
//                    public JSONArray call() throws Exception {
//                        //这里就是批次查找,每次100条循环获取数据
//                        JSONArray cityNetData = requestOutletData(date, finalCurrent, finalInit);
////                        if (cityNetData.get("sessiontimeout").equals("true")) {
////                            throw new SessionTimeoutException();
////                        }
////                        //对JSON数据预格式化
////                        JSONArray outletNetDataArray = (JSONArray) ((JSONObject) cityNetData.get("data")).get
//// ("result");
////                        for (int j = 0; j < outletNetDataArray.size(); j++) {
////                            jsonArray.add(outletNetDataArray.get(j));
////                        }
//                        jsonArray.addAll(cityNetData); //JSONArray实现了List接口
//                        System.out.println(jsonArray.size() + "/Thread:" + Thread.currentThread().getName());
//                        Thread.sleep(1000);
//                        return jsonArray;
//                    }
//                };
//
//                pool.submit(run);
//            } catch (Exception e) {
//                logger.info("获取失败,getOutletsNetData:{}", "xxxxx");
//                e.printStackTrace();
//            }
//        }
//
//        //任务执行完毕后 关闭线程池 不再接收新任务
//        pool.shutdown();
//        while (true) {
//            // 判断线程池中任务是否全部执行完毕  若执行完毕 再返回 list
//            if (pool.isTerminated()) {
//                break;
//            }
//        }
//        return jsonArray;
//    }


    public List<ExpRecordOutlet> jsonToOutlet(List<Map<String, Object>> cityNetData, Date date) {

        List<ExpRecordOutlet> result = new ArrayList<>();
        for (Map<String, Object> map : cityNetData) {
            String totalNum = ((String) map.get("TOTAL_NUM")).replace(",", "").trim();  //业务量
            String ywzb = ((String) map.get("YWZB")).replace("%", "").trim();//业务量占比
            String pql = ((String) map.get("PQL")).replace(",", "").trim();   //投递量
            String pqzb = ((String) map.get("PQZB")).replace("%", "").trim();  //投递量占比
            String ltzb = ((String) map.get("LTZB")).trim(); //揽投比
            String totalSmNum = ((String) map.get("TOTAL_SM_NUM")).replace(",", "").trim();  //实名量
            String smlv = ((String) map.get("SMLV")).replace("%", "").replace("-", "").trim();  //实名率
            String totalDsNum = ((String) map.get("TOTAL_DS_NUM")).replace(",", "").trim();  //电商量
            String dslv = ((String) map.get("DSLV")).replace("%", "").replace("-", "").trim(); //电商率

            String smSj = ((String) map.get("SM_SJ")).replace(",", "").trim(); //散件实名量
            String smSjLv = ((String) map.get("SM_SJ_LV")).replace("%", "").replace("-", "").trim(); //散件实名率
            String smXy = ((String) map.get("SM_XY")).replace(",", "").trim(); //协议实名量

            String dszb = ((String) map.get("DSZB")).replace("%", "").trim();
            String branchName = ((String) map.get("BRANCH_NAME")).trim(); //网点名称
            String brandId = ((String) map.get("BRAND_ID")).trim(); //企业名称

            String stationId = ((String) map.get("STATION_ID")).trim();
            String province = String.valueOf(map.get("province"));
            String city = ((String) map.get("city"));
            String county = ((String) map.get("county"));

            ExpRecordOutlet obj = new ExpRecordOutlet();

            obj.setProvince(province);
            obj.setCity(city);
            obj.setCounty(county);
            obj.setBusinessvolume(Integer.parseInt(totalNum));
            obj.setBusinessvolumeper(new BigDecimal(Double.parseDouble(ywzb) / 100));
            obj.setDelivervolume(Integer.parseInt(pql));
            obj.setDelivervolumeper(new BigDecimal(Double.parseDouble(pqzb) / 100));
            obj.setRealvolume(Integer.parseInt(totalSmNum));
            if (smlv != null && !"".equals(smlv)) {
                obj.setRealrate(new BigDecimal(Double.parseDouble(smlv) / 100));
            } else {
                obj.setRealrate(new BigDecimal(0));
            }
            obj.setEcomvolume(Integer.parseInt(totalDsNum));
            if (dslv != null && !"".equals(dslv)) {
                obj.setEcomrate(new BigDecimal(Double.parseDouble(dslv) / 100));
            } else {
                obj.setRealrate(new BigDecimal(0));
            }

            obj.setBrandid(brandId);
            obj.setOutletname(branchName);
            obj.setOutletid(stationId);

            if (smXy != null && !"".equals(smXy)) {
                obj.setPartvolume(Integer.parseInt(smXy));
            } else {
                obj.setPartvolume(0);
            }
            if (smSj != null && !"".equals(smSj)) {
                obj.setRealpartvolume(Integer.parseInt(smSj));
            } else {
                obj.setRealpartvolume(0);
            }
            if (smSjLv != null && !"".equals(smSjLv)) {
                obj.setRealpartrate(new BigDecimal(Double.parseDouble(smSjLv) / 100));
            } else {
                obj.setRealpartrate(new BigDecimal(0));
            }

            obj.setCreateTime(new Date());
            obj.setCountDate(App.removeHMS(date));
            obj.setCreateUser("robot");
            result.add(obj);
        }
        return result;
    }
}
ExpRecordOutletCallable
public class ExpRecordOutletCallable implements Callable<List<Map<String, Object>>> {

    private ExpRecordOutletService outletCopyService;  //需要通过构造方法把对应的业务service传进来 实际用的时候把类型变为对应的类型

    private String search;//查询条件 根据条件来定义该类的属性

    private int current;//当前页数

    private int size;//每页查询多少条

//    private List page;//每次分页查出来的数据

    //JSONArray实现了List接口
//    private List<Map<String, Object>> enterpriseNetData;//每次分页查出来的数据

    private Date date;

    private String cityCode;

    public ExpRecordOutletCallable(ExpRecordOutletService outletCopyService, Date date,
                                   int current, int size, String cityCode) throws
            IOException, SessionTimeoutException {
        this.outletCopyService = outletCopyService;
        this.date = date;
        this.current = current;
        this.size = size;
        this.cityCode = cityCode;
    }

    @Override
    public List<Map<String, Object>> call() throws Exception {
        //分页查询数据库数据
        List<Map<String, Object>> enterpriseNetData = outletCopyService.requestOutletData(date, current, size, cityCode);

//        //单个线程查询100条数据
//        enterpriseNetData = new JSONArray();
//        boolean flag = true;
//        while (flag) {
//            JSONArray data = outletCopyService.requestOutletData(date, current, size, cityCode);
//            if (data.size() != 0 && data.size() < size) {  //请求返回比如95条,size是100条
//                enterpriseNetData.addAll(data);
//                flag = false;
//                break;
//            }
//            if (data.size() != 0 && data.size() == size) {
//                current++;
//                enterpriseNetData.addAll(data);
//            }
//        }


        for (Map<String, Object> map : enterpriseNetData) {
            map.put("province", 320000);
            map.put("city", cityCode.substring(0, 4) + "00");
            map.put("county", cityCode);
        }

        System.out.println(enterpriseNetData.size() + "/Thread:" + Thread.currentThread().getName());

        Thread.sleep(1000);
        return enterpriseNetData;
    }
}

相关文章

网友评论

      本文标题:java用多线程批次查询大量数据(Callable返回数据)方式

      本文链接:https://www.haomeiwen.com/subject/bgpbzqtx.html