美文网首页
java处理大数据量的优化

java处理大数据量的优化

作者: 墨色尘埃 | 来源:发表于2019-04-03 10:16 被阅读0次

有一个需求:需要从网页抓取万条甚至十几万条数据,如果一次性抓取的话不仅慢而且有可能对系统产生影响。
这里的思路是:先发送一条1个记录的请求,获取返回json,得到总条数。然后每次以一定定值去请求获取数据,比如一次100条数据,记录需要循环的次数。创建一个线程池,多线程执行
定时任务


/**
* 以一个固定延迟时间5秒钟调用一次执行
* 这个周期是以上一个调用任务的##完成时间##为基准,在上一个任务完成之后,20分钟后再次执行
*/
@Scheduled(fixedDelay = 1000 * 60 * 20)//@Scheduled 可以作为一个触发源添加到一个方法中

/**
* 以一个固定延迟时间5秒钟调用一次执行
* 这个周期是以上一个任务##开始时间##为基准,从上一任务开始执行后20分钟再次执行
*/
@Scheduled(fixedRate  = 1000 * 60 * 20)//@Scheduled 可以作为一个触发源添加到一个方法中

/**
* 这里是在每天的13点30分执行一次
*/
@Scheduled(cron = "0 34 13 * * ?")//如果你需要在特定的时间执行,就需要用到cron 了

上面的源码中有3个方法,前2个方法实现的是每隔5秒运行一次。
demo3方法实现的是在固定每天的某个时间点运行一次。
【Spring】定时任务详解实例-@Scheduled

    private void outletsTask(List<Date> dates) {
        try {
            logger.info("outlets数据开始采集");
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
            for (Date date : dates) {
                logger.info(String.format("outlets[%s]数据采集开始", format.format(date)));
                JSONArray outletsNetData = getOutletsNetData(date);
                System.out.println("outletsNetData.size():" + outletsNetData.size());
                List<ExpRecordOutlet> expRecordOutlets = jsonToOutlet(outletsNetData, date);

                try {
                    outletService.todayData(expRecordOutlets, date);
                    logger.info(String.format("outlets[%s]数据采集结束", format.format(date)));
                } catch (Exception ex) {
                    logger.error("outlets数据更新失败", ex);
                }
            }
            logger.info("outlets数据结束采集");

        } catch (Exception ex) {
            logger.error("outlets采集任务执行失败", ex);
        }
    }

getOutletsNetData
①使用Callable实现
计算循环次数,多线程执行。判断线程池中任务是否全部执行完毕pool.isTerminated(),若执行完毕再返回 list

//纯正的Callable
Callable<JSONArray> run = new Callable<JSONArray>() {
    @Override
    public JSONArray call() throws Exception {
        return null;
    }
};
    //新版安监系统  网点数据
    private JSONArray getOutletsNetData(Date date) throws IOException, SessionTimeoutException, InterruptedException {
        JSONObject cityNetData0 = requestOutletData(date, 1);
        int total = (int) ((JSONObject) cityNetData0.get("data")).get("totalCount");
        int init = 100;// 每隔100条循环一次
        int cycelTotal = total / init;
        if (total % init != 0) {
            cycelTotal += 1;
            if (total < init) {
                init = total;
            }
        }
        System.out.println("循环获取数据的次数:" + cycelTotal);//循环多少次

        ExecutorService pool = Executors.newFixedThreadPool(5);
        JSONArray jsonArray = new JSONArray();
        for (int i = 0; i < cycelTotal; i++) {
            try {
                int finalInit = init;
                Callable<JSONArray> run = new Callable<JSONArray>() {
                    @Override
                    public JSONArray call() throws Exception {
                        JSONObject cityNetData = requestOutletData(date, finalInit);
                        if (cityNetData.get("sessiontimeout").equals("true")) {
                            throw new SessionTimeoutException();
                        }
                        logger.debug("[cityNetData]" + cityNetData.get("data").getClass().toString());
                        logger.debug("[cityNetData]" + cityNetData.get("data").toString());
                        //对JSON数据预格式化
                        JSONArray outletNetDataArray = (JSONArray) ((JSONObject) cityNetData.get("data")).get("result");
                        for (int i1 = 0; i1 < outletNetDataArray.size(); i1++) {
                            jsonArray.add(outletNetDataArray.get(i1));
                        }
                        System.out.println("jsonArray" + jsonArray.size() + "/Thread:" + Thread.currentThread().getName());
                        Thread.sleep(1000);
                        return jsonArray;
                    }
                };
                pool.submit(run);
            } catch (Exception e) {
                logger.info("获取失败,getOutletsNetData:{}", "xxxxx");
                e.printStackTrace();
            }
        }

        //任务执行完毕后 关闭线程池 不再接收新任务
        pool.shutdown();
        while (true) {
            // 判断线程池中任务是否全部执行完毕  若执行完毕 再返回 list
            if (pool.isTerminated()) {
                break;
            }
        }
        return jsonArray;
    }

①使用Runnable实现

    //新版安监系统  网点数据
    private JSONArray getOutletsNetData1(Date date) throws IOException, SessionTimeoutException {
        JSONObject cityNetData0 = requestOutletData(date, 1);
        int total = (int) ((JSONObject) cityNetData0.get("data")).get("totalCount");
        int init = 100;// 每隔100条循环一次
        int cycelTotal = total / init;
        if (total % init != 0) {
            cycelTotal += 1;
            if (total < init) {
                init = total;
            }
        }
        System.out.println("循环获取数据的次数:" + cycelTotal);//循环多少次

        ExecutorService pool = Executors.newFixedThreadPool(5);
        JSONArray jsonArray = new JSONArray();
        for (int i = 0; i < cycelTotal; i++) {
            try {
                int finalInit = init;
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            JSONObject cityNetData = requestOutletData(date, finalInit);
                            if (cityNetData.get("sessiontimeout").equals("true")) {
                                throw new SessionTimeoutException();
                            }
                            logger.debug("[cityNetData]" + cityNetData.get("data").getClass().toString());
                            logger.debug("[cityNetData]" + cityNetData.get("data").toString());
                            //对JSON数据预格式化
                            JSONArray outletNetDataArray = (JSONArray) ((JSONObject) cityNetData.get("data")).get
                                    ("result");
                            for (int i1 = 0; i1 < outletNetDataArray.size(); i1++) {
                                jsonArray.add(outletNetDataArray.get(i1));
                            }
                            System.out.println("jsonArray" + jsonArray.size() + "/Thread:" + Thread.currentThread()
                                    .getName());
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (IOException e) {
                            e.printStackTrace();
                        } catch (SessionTimeoutException e) {
                            e.printStackTrace();
                        }
                    }
                };
                pool.execute(runnable);
            } catch (Exception e) {
                logger.info("获取失败,getOutletsNetData:{}", "xxxxx");
                e.printStackTrace();
            }
        }

        //任务执行完毕后 关闭线程池 不再接收新任务
        pool.shutdown();
        while (true) {
            // 判断线程池中任务是否全部执行完毕  若执行完毕 再返回 list
            if (pool.isTerminated()) {
                break;
            }
        }
        return jsonArray;
    }

两种方法优劣:由于Thread.sleep(1000);需要异常处理,所以这里使用Callable,因为Callable能抛出异常以及返回值,Runnable则不行>。所以优先选择使用Callable

Callable和Runnable的区别如下:

Callable定义的方法是call,而Runnable定义的方法是run。
Callable的call方法可以有返回值,而Runnable的run方法不能有返回值。
Callable的call方法可抛出异常,而Runnable的run方法不能抛出异常。

java for 循环使用线程池优化
使用for循环创建线程,利用线程池并发处理线程——线程池
Java 四种线程池

requestOutletData
动态设置请求个数,其中%s可以使用String.format(jsonPage, dayStr, dayStr, size)替换,如下代码中jsonPage有三个%s,则String.format方法中有三个参数。

    public JSONObject requestOutletData(Date date, int size) throws IOException, SessionTimeoutException {
        //采集数据的URL
        String url = "http://10.11.100.83:8090/companybusiness/companybusiness!loadBranchList_city.action";
        //条件构造
        Map<String, String> map = new HashMap<>();
        String jsonPage = "{\"condition\":[{\"col\":\"\",\"value\":[\"全部\"]},{\"col\":\"select2\",\"value\":[\"-1\"]," +
                "\"type\":\"=\"},{\"col\":\"order_pro\",\"value\":[\"YWLZB\"],\"type\":\"=\"}," +
                "{\"col\":\"order_city\",\"value\":[\"YWLZB\"],\"type\":\"=\"},{\"col\":\"pro_code\"," +
                "\"value\":[\"320000\"],\"type\":\"=\"},{\"col\":\"city_code\",\"value\":[\"320000\"]}," +
                "{\"col\":\"order_wd\",\"value\":[\"YWZB\"],\"type\":\"=\"},{\"col\":\"startDate\"," +
                "\"value\":[\"%s\"],\"type\":\">=\"},{\"col\":\"endDate\",\"value\":[\"%s\"],\"type\":\"<=\"}]," +
                "\"pageNo\":\"1\",\"pageSize\":%s,\"pageSizes\":[10,20],\"dir\":\"DESC\"}\n";
        String dayStr = new SimpleDateFormat("yyyy-MM-dd").format(date);
        jsonPage = String.format(jsonPage, dayStr, dayStr, size);
        map.put("jsonpage", jsonPage);
        map.put("isAjax", "true");
        String result = Utils.requestUrlByFormData(App.cookie, url, map); //获取网页返回的JSON数据
        JSONObject cityNetData = JSON.parseObject(result);
        return cityNetData;
    }

相关文章

网友评论

      本文标题:java处理大数据量的优化

      本文链接:https://www.haomeiwen.com/subject/iqaobqtx.html