美文网首页
java处理大数据量的优化

java处理大数据量的优化

作者: 墨色尘埃 | 来源:发表于2019-04-03 10:16 被阅读0次

    有一个需求:需要从网页抓取万条甚至十几万条数据,如果一次性抓取的话不仅慢而且有可能对系统产生影响。
    这里的思路是:先发送一条1个记录的请求,获取返回json,得到总条数。然后每次以一定定值去请求获取数据,比如一次100条数据,记录需要循环的次数。创建一个线程池,多线程执行
    定时任务

    
    /**
    * 以一个固定延迟时间5秒钟调用一次执行
    * 这个周期是以上一个调用任务的##完成时间##为基准,在上一个任务完成之后,20分钟后再次执行
    */
    @Scheduled(fixedDelay = 1000 * 60 * 20)//@Scheduled 可以作为一个触发源添加到一个方法中
    
    /**
    * 以一个固定延迟时间5秒钟调用一次执行
    * 这个周期是以上一个任务##开始时间##为基准,从上一任务开始执行后20分钟再次执行
    */
    @Scheduled(fixedRate  = 1000 * 60 * 20)//@Scheduled 可以作为一个触发源添加到一个方法中
    
    /**
    * 这里是在每天的13点30分执行一次
    */
    @Scheduled(cron = "0 34 13 * * ?")//如果你需要在特定的时间执行,就需要用到cron 了
    

    上面的源码中有3个方法,前2个方法实现的是每隔5秒运行一次。
    demo3方法实现的是在固定每天的某个时间点运行一次。
    【Spring】定时任务详解实例-@Scheduled

        private void outletsTask(List<Date> dates) {
            try {
                logger.info("outlets数据开始采集");
                SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
                for (Date date : dates) {
                    logger.info(String.format("outlets[%s]数据采集开始", format.format(date)));
                    JSONArray outletsNetData = getOutletsNetData(date);
                    System.out.println("outletsNetData.size():" + outletsNetData.size());
                    List<ExpRecordOutlet> expRecordOutlets = jsonToOutlet(outletsNetData, date);
    
                    try {
                        outletService.todayData(expRecordOutlets, date);
                        logger.info(String.format("outlets[%s]数据采集结束", format.format(date)));
                    } catch (Exception ex) {
                        logger.error("outlets数据更新失败", ex);
                    }
                }
                logger.info("outlets数据结束采集");
    
            } catch (Exception ex) {
                logger.error("outlets采集任务执行失败", ex);
            }
        }
    

    getOutletsNetData
    ①使用Callable实现
    计算循环次数,多线程执行。判断线程池中任务是否全部执行完毕pool.isTerminated(),若执行完毕再返回 list

    //纯正的Callable
    Callable<JSONArray> run = new Callable<JSONArray>() {
        @Override
        public JSONArray call() throws Exception {
            return null;
        }
    };
    
        //新版安监系统  网点数据
        private JSONArray getOutletsNetData(Date date) throws IOException, SessionTimeoutException, InterruptedException {
            JSONObject cityNetData0 = requestOutletData(date, 1);
            int total = (int) ((JSONObject) cityNetData0.get("data")).get("totalCount");
            int init = 100;// 每隔100条循环一次
            int cycelTotal = total / init;
            if (total % init != 0) {
                cycelTotal += 1;
                if (total < init) {
                    init = total;
                }
            }
            System.out.println("循环获取数据的次数:" + cycelTotal);//循环多少次
    
            ExecutorService pool = Executors.newFixedThreadPool(5);
            JSONArray jsonArray = new JSONArray();
            for (int i = 0; i < cycelTotal; i++) {
                try {
                    int finalInit = init;
                    Callable<JSONArray> run = new Callable<JSONArray>() {
                        @Override
                        public JSONArray call() throws Exception {
                            JSONObject cityNetData = requestOutletData(date, finalInit);
                            if (cityNetData.get("sessiontimeout").equals("true")) {
                                throw new SessionTimeoutException();
                            }
                            logger.debug("[cityNetData]" + cityNetData.get("data").getClass().toString());
                            logger.debug("[cityNetData]" + cityNetData.get("data").toString());
                            //对JSON数据预格式化
                            JSONArray outletNetDataArray = (JSONArray) ((JSONObject) cityNetData.get("data")).get("result");
                            for (int i1 = 0; i1 < outletNetDataArray.size(); i1++) {
                                jsonArray.add(outletNetDataArray.get(i1));
                            }
                            System.out.println("jsonArray" + jsonArray.size() + "/Thread:" + Thread.currentThread().getName());
                            Thread.sleep(1000);
                            return jsonArray;
                        }
                    };
                    pool.submit(run);
                } catch (Exception e) {
                    logger.info("获取失败,getOutletsNetData:{}", "xxxxx");
                    e.printStackTrace();
                }
            }
    
            //任务执行完毕后 关闭线程池 不再接收新任务
            pool.shutdown();
            while (true) {
                // 判断线程池中任务是否全部执行完毕  若执行完毕 再返回 list
                if (pool.isTerminated()) {
                    break;
                }
            }
            return jsonArray;
        }
    

    ①使用Runnable实现

        //新版安监系统  网点数据
        private JSONArray getOutletsNetData1(Date date) throws IOException, SessionTimeoutException {
            JSONObject cityNetData0 = requestOutletData(date, 1);
            int total = (int) ((JSONObject) cityNetData0.get("data")).get("totalCount");
            int init = 100;// 每隔100条循环一次
            int cycelTotal = total / init;
            if (total % init != 0) {
                cycelTotal += 1;
                if (total < init) {
                    init = total;
                }
            }
            System.out.println("循环获取数据的次数:" + cycelTotal);//循环多少次
    
            ExecutorService pool = Executors.newFixedThreadPool(5);
            JSONArray jsonArray = new JSONArray();
            for (int i = 0; i < cycelTotal; i++) {
                try {
                    int finalInit = init;
                    Runnable runnable = new Runnable() {
                        @Override
                        public void run() {
                            try {
                                JSONObject cityNetData = requestOutletData(date, finalInit);
                                if (cityNetData.get("sessiontimeout").equals("true")) {
                                    throw new SessionTimeoutException();
                                }
                                logger.debug("[cityNetData]" + cityNetData.get("data").getClass().toString());
                                logger.debug("[cityNetData]" + cityNetData.get("data").toString());
                                //对JSON数据预格式化
                                JSONArray outletNetDataArray = (JSONArray) ((JSONObject) cityNetData.get("data")).get
                                        ("result");
                                for (int i1 = 0; i1 < outletNetDataArray.size(); i1++) {
                                    jsonArray.add(outletNetDataArray.get(i1));
                                }
                                System.out.println("jsonArray" + jsonArray.size() + "/Thread:" + Thread.currentThread()
                                        .getName());
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            } catch (IOException e) {
                                e.printStackTrace();
                            } catch (SessionTimeoutException e) {
                                e.printStackTrace();
                            }
                        }
                    };
                    pool.execute(runnable);
                } catch (Exception e) {
                    logger.info("获取失败,getOutletsNetData:{}", "xxxxx");
                    e.printStackTrace();
                }
            }
    
            //任务执行完毕后 关闭线程池 不再接收新任务
            pool.shutdown();
            while (true) {
                // 判断线程池中任务是否全部执行完毕  若执行完毕 再返回 list
                if (pool.isTerminated()) {
                    break;
                }
            }
            return jsonArray;
        }
    

    两种方法优劣:由于Thread.sleep(1000);需要异常处理,所以这里使用Callable,因为Callable能抛出异常以及返回值,Runnable则不行>。所以优先选择使用Callable

    Callable和Runnable的区别如下:

    Callable定义的方法是call,而Runnable定义的方法是run。
    Callable的call方法可以有返回值,而Runnable的run方法不能有返回值。
    Callable的call方法可抛出异常,而Runnable的run方法不能抛出异常。

    java for 循环使用线程池优化
    使用for循环创建线程,利用线程池并发处理线程——线程池
    Java 四种线程池

    requestOutletData
    动态设置请求个数,其中%s可以使用String.format(jsonPage, dayStr, dayStr, size)替换,如下代码中jsonPage有三个%s,则String.format方法中有三个参数。

        public JSONObject requestOutletData(Date date, int size) throws IOException, SessionTimeoutException {
            //采集数据的URL
            String url = "http://10.11.100.83:8090/companybusiness/companybusiness!loadBranchList_city.action";
            //条件构造
            Map<String, String> map = new HashMap<>();
            String jsonPage = "{\"condition\":[{\"col\":\"\",\"value\":[\"全部\"]},{\"col\":\"select2\",\"value\":[\"-1\"]," +
                    "\"type\":\"=\"},{\"col\":\"order_pro\",\"value\":[\"YWLZB\"],\"type\":\"=\"}," +
                    "{\"col\":\"order_city\",\"value\":[\"YWLZB\"],\"type\":\"=\"},{\"col\":\"pro_code\"," +
                    "\"value\":[\"320000\"],\"type\":\"=\"},{\"col\":\"city_code\",\"value\":[\"320000\"]}," +
                    "{\"col\":\"order_wd\",\"value\":[\"YWZB\"],\"type\":\"=\"},{\"col\":\"startDate\"," +
                    "\"value\":[\"%s\"],\"type\":\">=\"},{\"col\":\"endDate\",\"value\":[\"%s\"],\"type\":\"<=\"}]," +
                    "\"pageNo\":\"1\",\"pageSize\":%s,\"pageSizes\":[10,20],\"dir\":\"DESC\"}\n";
            String dayStr = new SimpleDateFormat("yyyy-MM-dd").format(date);
            jsonPage = String.format(jsonPage, dayStr, dayStr, size);
            map.put("jsonpage", jsonPage);
            map.put("isAjax", "true");
            String result = Utils.requestUrlByFormData(App.cookie, url, map); //获取网页返回的JSON数据
            JSONObject cityNetData = JSON.parseObject(result);
            return cityNetData;
        }
    

    相关文章

      网友评论

          本文标题:java处理大数据量的优化

          本文链接:https://www.haomeiwen.com/subject/iqaobqtx.html