美文网首页
java处理大数据量的优化(测试)

java处理大数据量的优化(测试)

作者: 墨色尘埃 | 来源:发表于2019-05-24 12:38 被阅读0次

    线程池:
    项目中永远不要自己去new线程,然后start,一般都是用线程池。
    定期发送短信业务可以用核心线程可以被回收的线程池,因为并不是时刻都在发短信。
    其他业务如果处理比较频繁,那就用固定线程数或者核心线程数固定,设置最大线程数的线程池。
    一个项目中,每个线程池最好专一做一个业务,你不用担心线程池多了会不会影响性能。数据库驱动也用到了线程池,redis驱动也用到了线程池,spring很多组件都用到了线程池,所以一个项目中会存在很多线程池,一般一个java进程里有两三百的活跃线程,都是因为项目中引用的其他库中用到的线程池,其实你自己用到的只是冰山一角。

    String url = "http://127.0.0.1:10002/api/authEnroll/page?current=1&size=1";作为测试URL,通过JSOUP抓取改URL的数据。这个URL是考试系统的一个GET接口,dev开发环境下可以不登陆通过浏览器访问获取数据。
    项目地址:E:\IDEAProject\sjcj_pc_server

    测试一

    TaskCallable
    TaskCallable实现Callable接口,重写call()方法,call()方法里实现了查询每一页数据并返回每页数据的功能

    public class TaskCallable implements Callable<List<Map<String, Object>> {
    
        TaskService taskService = new TaskService();
    
        private String search;//查询条件 根据条件来定义该类的属性
    
        private int bindex;//当前页数
    
        private int num;//每页查询多少条
    
        private List page;//每次分页查出来的数据
    
        //private JSONArray enterpriseNetData;//每次分页查出来的数据
    
        public TaskCallable(int bindex, int num) throws IOException, SessionTimeoutException {
            this.bindex = bindex;
            this.num = num;
        }
    
        @Override
        public List<Map<String, Object>> call() throws Exception {
            //分页查询数据库数据
            List<Map<String, Object>> enterpriseNetData = taskService.enterpriseNetData(bindex, num);
            System.out.println(enterpriseNetData.size() + "/Thread:" + Thread.currentThread().getName());
            return enterpriseNetData;
        }
    }
    

    TaskService

    maxBodySize太大的话多线程时会导致oom。如果一个请求就要申请9M,那么100多个请求,算下来都1个多G了,会导致OOM

    public class TaskService {
        private static final org.slf4j.Logger logger = LoggerFactory.getLogger(TaskService.class);
        /**
         * GET请求
         */
        public List<Map<String, Object>> enterpriseNetData(int current,int size) throws IOException, SessionTimeoutException {
            String url = "http://127.0.0.1:10002/api/authEnroll/page?current=" + current + "&size=" + size;
            Connection connection = Jsoup.connect(url)
                    .ignoreContentType(true)
                    .method(Connection.Method.GET)
                    .header("User-Agent", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:48.0) Gecko/20100101 Firefox/48.0")
                    .followRedirects(false)
                    .maxBodySize(10000000)   //body大小 1024*1024=1M, 太大的话多线程时会导致oom。如果一个请求就要申请9M,那么100多个请求,算下来都1个多G了,会导致OOM
                    .timeout(1000 * 100);
            String body = connection.execute().body();
    
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
            //json字符串转为Map对象
            Map jsonObject = objectMapper.readValue(result, Map.class);
    
            logger.debug("[enterpriseData]" + jsonObject.get("data").getClass().toString());
            logger.debug("[enterpriseData]" + jsonObject.get("data").toString());
    
            List<Map<String, Object>> list = (JSONArray) ((JSONObject) jsonObject.get("data")).get("records");
            return list;
        }
    
        /**
         * 计算总数
         */
        public Map<String,Object> getEnterpriseNetData(int current, int size) throws IOException, SessionTimeoutException {
            String url = "http://127.0.0.1:10002/api/authEnroll/page?current=1&size=1";
            Connection connection = Jsoup.connect(url)
                    .ignoreContentType(true)
                    .method(Connection.Method.GET)
                    .header("User-Agent", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:48.0) Gecko/20100101 Firefox/48.0")
                    .followRedirects(false)
                    .maxBodySize(10000000)   //body大小
                    .timeout(1000 * 100);
            String body = connection.execute().body();
            //json字符串转为Map对象
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
            Map<String,Object> jsonObject = objectMapper.readValue(result, Map.class);
    
            logger.debug("[enterpriseData]" + jsonObject.get("data").getClass().toString());
            logger.debug("[enterpriseData]" + jsonObject.get("data").toString());
            return jsonObject;
        }
    }
    

    ThreadTest
    测试类

    @RunWith(SpringRunner.class)
    public class ThreadTest {
    
        @Test
        public void test1() throws ExecutionException, InterruptedException, IOException, SessionTimeoutException {
    
            TaskService taskService = new TaskService();
            Map<String,Object> enterpriseNetData = taskService.getEnterpriseNetData(1, 1);
    
            Integer total = (Integer) enterpriseNetData.get("data").get("total");
            List<Map<String,Object>> records = (List<Map<String,Object>>) ( enterpriseNetData.get("data")).get("records");
            System.out.println(total);
    
            getMaxResult(total);
        }
    
        public List<Map<String,Object>> getMaxResult(int total) throws InterruptedException, ExecutionException, IOException,
                SessionTimeoutException {
            long start = System.currentTimeMillis();//开始时间
    
            //查询数据库总数量
            int count = total;
            int size = 1000;//一次查询多少条
            //需要查询的次数
            int times = count / size;
            if (count % size != 0) {
                times = times + 1;
            }
            //开始页数  连接的是orcle的数据库  封装的分页方式  我的是从1开始
            int current = 1;
    
            //Callable用于产生结果
            List<Callable<JSONArray>> tasks = new ArrayList<>();
            for (int i = 0; i < times; i++) {
                Callable<List<Map<String,Object>>> callable = new TaskCallable(current, size);
                tasks.add(callable);
                current++;
            }
    
            //定义固定长度的线程池  防止线程过多
            ExecutorService executorService = Executors.newFixedThreadPool(15);
            //Future用于获取结果
            List<Future<List<Map<String,Object>>>> futures = executorService.invokeAll(tasks);
    
            List<Map<String,Object>> list = new ArrayList<Map<String,Object>>();
            //处理线程返回结果
            if (futures != null && futures.size() > 0) {
                for (Future<JSONArray> future : futures) {
                    list.addAll(future.get());
                }
            }
    
            executorService.shutdown();//关闭线程池
            while (true) {
                // 判断线程池中任务是否全部执行完毕  若执行完毕 再返回 list
                if (executorService.isTerminated()) {
                    break;
                }
            }
            long end = System.currentTimeMillis();
            System.out.println("线程查询数据用时:" + (end - start) + "ms");
            return list;
        }
    }
    

    测试二

    测试二使用匿名内部类方式new Callable出来,
    TaskService同测试一
    ThreadTest1

    package thread;
    
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONObject;
    import com.jsptpd.yzglj.SessionTimeoutException;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * Created by HASEE on 2019/3/29 10:10
     */
    @RunWith(SpringRunner.class)
    public class ThreadTest1 {
    
    
        @Test
        public void test1() throws ExecutionException, InterruptedException, IOException, SessionTimeoutException {
    
            TaskService taskService = new TaskService();
            Map<String,Object> enterpriseNetData = taskService.getEnterpriseNetData(1, 1);
    
            Integer total = (Integer) (enterpriseNetData.get("data")).get("total");
            List<Map<String,Object>> records = (List<Map<String,Object>>) ( enterpriseNetData.get("data")).get("records");
            System.out.println(total);
    
            getMaxResult(total);
    
        }
    
    
        public List<Map<String,Object>> getMaxResult(int total) throws InterruptedException, ExecutionException, IOException,
                SessionTimeoutException {
            long start = System.currentTimeMillis();//开始时间
    
            //查询数据库总数量
            int count = total;
            int size = 1000;//一次查询多少条
            //需要查询的次数
            int times = count / size;
            if (count % size != 0) {
                times = times + 1;
            }
            //开始页数  连接的是orcle的数据库  封装的分页方式  我的是从1开始
            int current = 1;
    
            //Callable用于产生结果
            List<Callable<List<Map<String,Object>>>> tasks = new ArrayList<>();
    
            //定义固定长度的线程池  防止线程过多
            ExecutorService pool = Executors.newFixedThreadPool(15);
    
            TaskService taskService = new TaskService();
            JSONArray jsonArray = new JSONArray();
            for (int i = 0; i < times; i++) {
                int finalCurrent = current;
                Callable<List<Map<String,Object>>> run = new Callable<List<Map<String,Object>>>() {
                    @Override
                    public List<Map<String,Object>> call() throws Exception {
                        //查询每一页的数据
                        List<Map<String,Object>> cityNetData = taskService.enterpriseNetData(finalCurrent, size);
                        jsonArray.addAll(cityNetData); //JSONArray实现了List接口
                        System.out.println(jsonArray.size() + "/Thread:" + Thread.currentThread().getName());
                        Thread.sleep(1000);
                        return jsonArray;
                    }
                };
    
                current++;
                pool.submit(run);
            }
    
            pool.shutdown();//关闭线程池
            while (true) {
                // 判断线程池中任务是否全部执行完毕  若执行完毕 再返回 list
                if (pool.isTerminated()) {
                    break;
                }
            }
            long end = System.currentTimeMillis();
            System.out.println("线程查询数据用时:" + (end - start) + "ms");
            return jsonArray;
        }
    }
    
    

    测试一和测试二的区别

    测试一自定义了一个Callable,测试二使用了匿名内部类,有点像Comparator和Compable比较器。
    测试一也是在for循环里面Callable<List<Map<String,Object>>> callable = new TaskCallable(current, size);
    测试二Callable<List<Map<String,Object>>> run = new Callable<List<Map<String,Object>>>() { ....... }

    测试一:①【将获取每页数据的任务放进线程池List<Callable<List<Map<String,Object>>>>中,然后pool.invokeAll(tasks)】,②【Future用于获取结果,最后处理线程返回结果,关闭线程池】。①(在for循环之内,即需要查询的次数)②(在for循环之外处理)
    测试二:获取每页数据,处理线程返回数据,将数据新增到集合中。(这一过程都是在for循环之内进行)

    测试一,每次查询100条数据。处理线程返回结果.png 测试二,每次查询的数据新增到集合中.png

    查看多线程启动情况

    360截图16640329394538.png 360截图16501111113816.png 360截图16220420358571.png

    相关文章

      网友评论

          本文标题:java处理大数据量的优化(测试)

          本文链接:https://www.haomeiwen.com/subject/tzwbzqtx.html