美文网首页《JAVA并发编程实战》示例程序
线程安全10 - 高并发请求 缩减selectOne --> s

线程安全10 - 高并发请求 缩减selectOne --> s

作者: 小超_8b2f | 来源:发表于2020-01-07 10:43 被阅读0次

    1. 根据code、clinicId 两个参数查询版本

    \color{red}{(注意:没有查询到数据时会阻塞,待修正)}

    @Override
        public Param selectParamNew(Param param) {
            String id = SessionUtil.getClinicId() + "|" + param.getCode();
            Request request = new Request(UUID.randomUUID().toString(),id,SessionUtil.getClinicId(),param.getCode(),new CompletableFuture<Param>());
            queue.add(request);
            Param result = null;
            try{
                result = request.future.get();
            } catch ( InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            return result;
        }
    
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        private class Request {
            String serialNo;
            String id;
            Integer clinicId;
            String code;
            CompletableFuture<Param> future;
        }
    
        LinkedBlockingDeque<Request> queue = new LinkedBlockingDeque();
    
    
    
        @PostConstruct
        public void init() {
            ThreadPoolUtils t;
            ScheduledExecutorService pool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 2 - 1);
            User user = SessionUtil.getCurrentUser();
    
            //每隔10毫秒根据队列里堆积的查询id执行一次批量查询
            pool.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    SessionUtil.setContext(user,"hello world");
                    try {
                        List<Request> requests = new ArrayList<>();
                        Map<String, Request> serialNoRequestMap = new HashMap<>();
                        Set<String> ids = new HashSet<>();
                        if (queue.isEmpty()) {
                            return;
                        }
                        while (!queue.isEmpty()) {
                            Request request = queue.pop();
                            requests.add(request);
                            serialNoRequestMap.put(request.getSerialNo(), request);
                            ids.add(request.getId());
                        }
                        Map<String, Param> serialNoUserMap = selectBatch(requests);
    
                        for (Request request : requests) {
                            Param param = serialNoUserMap.get(request.getSerialNo());
                            request.getFuture().complete(param);
                        }
                    }catch (Exception e) {
                        e.printStackTrace();
                    }finally {
                        SessionUtil.removeContext();
                    }
                    System.out.println("-----PostConstruct.run() end");
                }
            },0,10, TimeUnit.MILLISECONDS);
        }
    
    
        /**
         * 根据序列号查询
         * @param reqeusts
         * @return
         */
        private Map<String,Param> selectBatch(List<Request> reqeusts) {
            Map<String,Param> serialNo_param_map = new HashMap<>();
            Map<String,List<String>> idSerialNosMap = new HashMap<>();
            Map<Integer,Set<String>> clinicIdCodesMap = new HashMap<>(); //<clinicId,[code1,code2]>
            List<ParamBatchQuery> queryParamList = new ArrayList<>();
    
            for (Request request : reqeusts) {
                if(!clinicIdCodesMap.containsKey(request.getClinicId())) {
                    Set<String> codeSet = new HashSet<>();
                    codeSet.add(request.getCode());
                    clinicIdCodesMap.put(request.getClinicId(),codeSet);
                } else {
                    clinicIdCodesMap.get(request.getClinicId()).add(request.getCode());
                }
    
                if(!idSerialNosMap.containsKey(request.getId())) {
                    List<String> serialNoList = new ArrayList<>();
                    serialNoList.add(request.getSerialNo());
                    idSerialNosMap.put(request.getId(),serialNoList);
                } else {
                    idSerialNosMap.get(request.getId()).add(request.getSerialNo());
                }
            }
    
            List<ParamBatchQuery> listQueryParam = new ArrayList<>();
            for (Map.Entry<Integer, Set<String>> entry : clinicIdCodesMap.entrySet())
                listQueryParam.add(new ParamBatchQuery(entry.getKey(),entry.getValue()));
    
            List<Param> params = paramMapper.selectParamBatch(listQueryParam);
    
            System.out.println("----本次查询共获取:" + params.size() + "个结果");
            for(Param param : params) {
                List<String> serialNoList = idSerialNosMap.get(param.getClinicId()+"|"+param.getCode());
                for(String serialNo : serialNoList) {
                    serialNo_param_map.put(serialNo,param);
                }
            }
            return serialNo_param_map;
        }
    

    2. 一个参数版本:

        @Autowired
        UserMapper userMapper;
    
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        class Request {
            String serialNo;
            Integer id;
            CompletableFuture<User> future;
        }
    
        LinkedBlockingDeque<Request> queue = new LinkedBlockingDeque();
    
        @PostConstruct
        public void init() {
            ScheduledExecutorService pool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 2 - 1);
            //每隔10毫秒根据队列里堆积的查询id执行一次批量查询
            pool.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    List<Request> requests = new ArrayList<>();
                    Map<String,Request> serialNoRequestMap = new HashMap<>();
                    List<Integer> ids = new ArrayList<>();
                    if(queue.isEmpty()) {
                        return;
                    }
                    while(!queue.isEmpty()) {
                        Request request = queue.pop();
                        requests.add(request);
                        serialNoRequestMap.put(request.getSerialNo(),request);
                        ids.add(request.getId());
                    }
    
                    System.out.println("-----ids : " + ids);
                    Map<String, User> serialNoUserMap = selectBatch(requests);
    //                List<User> usersResult = userMapper.selectList(ids);
    
                    for(Request request : requests) {
                        User user = serialNoUserMap.get(request.getSerialNo());
                        request.getFuture().complete(user);
                    }
                    System.out.println("-----PostConstruct.run() end");
                }
            },0,10, TimeUnit.MILLISECONDS);
        }
    
    
        /**
         * 根据序列号查询
         * @param reqeusts
         * @return
         */
        public Map<String,User> selectBatch(List<Request> reqeusts) {
            Map<String,User> serialNo_user_map = new HashMap<>();
            List<Integer> ids = new ArrayList<>();
            Map<Integer,List<String>> idSerialNosMap = new HashMap<>();
            for (Request request : reqeusts) {
                request.getSerialNo();
                ids.add(request.getId());
                if(!idSerialNosMap.containsKey(request.getId())) {
                    List<String> serialNoList = new ArrayList<>();
                    serialNoList.add(request.getSerialNo());
                    idSerialNosMap.put(request.getId(),serialNoList);
                } else {
                    idSerialNosMap.get(request.getId()).add(request.getSerialNo());
                }
            }
    
            List<User> users = userMapper.selectList(ids);
            System.out.println("----本次查询共获取:" + users.size() + "个结果");
            for(User user : users) {
                List<String> serialNoList = idSerialNosMap.get(user.getId());
                for(String serialNo : serialNoList) {
                    serialNo_user_map.put(serialNo,user);
                }
            }
            return serialNo_user_map;
        }
    
        @Override
        public User selectByPrimaryKey(Integer id){
            Request request = new Request(UUID.randomUUID().toString(),id,new CompletableFuture<User>());
            queue.add(request);
            User user = null;
            try{
                user = request.future.get();
            } catch ( InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            return user;
    //        return userMapper.selectByPrimaryKey(id);
        }
    

    相关文章

      网友评论

        本文标题:线程安全10 - 高并发请求 缩减selectOne --> s

        本文链接:https://www.haomeiwen.com/subject/ibfmnctx.html