美文网首页《JAVA并发编程实战》示例程序
线程安全10 - 高并发请求 缩减selectOne --> s

线程安全10 - 高并发请求 缩减selectOne --> s

作者: 小超_8b2f | 来源:发表于2020-01-07 10:43 被阅读0次

1. 根据code、clinicId 两个参数查询版本

\color{red}{(注意:没有查询到数据时会阻塞,待修正)}

@Override
    public Param selectParamNew(Param param) {
        String id = SessionUtil.getClinicId() + "|" + param.getCode();
        Request request = new Request(UUID.randomUUID().toString(),id,SessionUtil.getClinicId(),param.getCode(),new CompletableFuture<Param>());
        queue.add(request);
        Param result = null;
        try{
            result = request.future.get();
        } catch ( InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        return result;
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    private class Request {
        String serialNo;
        String id;
        Integer clinicId;
        String code;
        CompletableFuture<Param> future;
    }

    LinkedBlockingDeque<Request> queue = new LinkedBlockingDeque();



    @PostConstruct
    public void init() {
        ThreadPoolUtils t;
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 2 - 1);
        User user = SessionUtil.getCurrentUser();

        //每隔10毫秒根据队列里堆积的查询id执行一次批量查询
        pool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                SessionUtil.setContext(user,"hello world");
                try {
                    List<Request> requests = new ArrayList<>();
                    Map<String, Request> serialNoRequestMap = new HashMap<>();
                    Set<String> ids = new HashSet<>();
                    if (queue.isEmpty()) {
                        return;
                    }
                    while (!queue.isEmpty()) {
                        Request request = queue.pop();
                        requests.add(request);
                        serialNoRequestMap.put(request.getSerialNo(), request);
                        ids.add(request.getId());
                    }
                    Map<String, Param> serialNoUserMap = selectBatch(requests);

                    for (Request request : requests) {
                        Param param = serialNoUserMap.get(request.getSerialNo());
                        request.getFuture().complete(param);
                    }
                }catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    SessionUtil.removeContext();
                }
                System.out.println("-----PostConstruct.run() end");
            }
        },0,10, TimeUnit.MILLISECONDS);
    }


    /**
     * 根据序列号查询
     * @param reqeusts
     * @return
     */
    private Map<String,Param> selectBatch(List<Request> reqeusts) {
        Map<String,Param> serialNo_param_map = new HashMap<>();
        Map<String,List<String>> idSerialNosMap = new HashMap<>();
        Map<Integer,Set<String>> clinicIdCodesMap = new HashMap<>(); //<clinicId,[code1,code2]>
        List<ParamBatchQuery> queryParamList = new ArrayList<>();

        for (Request request : reqeusts) {
            if(!clinicIdCodesMap.containsKey(request.getClinicId())) {
                Set<String> codeSet = new HashSet<>();
                codeSet.add(request.getCode());
                clinicIdCodesMap.put(request.getClinicId(),codeSet);
            } else {
                clinicIdCodesMap.get(request.getClinicId()).add(request.getCode());
            }

            if(!idSerialNosMap.containsKey(request.getId())) {
                List<String> serialNoList = new ArrayList<>();
                serialNoList.add(request.getSerialNo());
                idSerialNosMap.put(request.getId(),serialNoList);
            } else {
                idSerialNosMap.get(request.getId()).add(request.getSerialNo());
            }
        }

        List<ParamBatchQuery> listQueryParam = new ArrayList<>();
        for (Map.Entry<Integer, Set<String>> entry : clinicIdCodesMap.entrySet())
            listQueryParam.add(new ParamBatchQuery(entry.getKey(),entry.getValue()));

        List<Param> params = paramMapper.selectParamBatch(listQueryParam);

        System.out.println("----本次查询共获取:" + params.size() + "个结果");
        for(Param param : params) {
            List<String> serialNoList = idSerialNosMap.get(param.getClinicId()+"|"+param.getCode());
            for(String serialNo : serialNoList) {
                serialNo_param_map.put(serialNo,param);
            }
        }
        return serialNo_param_map;
    }

2. 一个参数版本:

    @Autowired
    UserMapper userMapper;

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    class Request {
        String serialNo;
        Integer id;
        CompletableFuture<User> future;
    }

    LinkedBlockingDeque<Request> queue = new LinkedBlockingDeque();

    @PostConstruct
    public void init() {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 2 - 1);
        //每隔10毫秒根据队列里堆积的查询id执行一次批量查询
        pool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                List<Request> requests = new ArrayList<>();
                Map<String,Request> serialNoRequestMap = new HashMap<>();
                List<Integer> ids = new ArrayList<>();
                if(queue.isEmpty()) {
                    return;
                }
                while(!queue.isEmpty()) {
                    Request request = queue.pop();
                    requests.add(request);
                    serialNoRequestMap.put(request.getSerialNo(),request);
                    ids.add(request.getId());
                }

                System.out.println("-----ids : " + ids);
                Map<String, User> serialNoUserMap = selectBatch(requests);
//                List<User> usersResult = userMapper.selectList(ids);

                for(Request request : requests) {
                    User user = serialNoUserMap.get(request.getSerialNo());
                    request.getFuture().complete(user);
                }
                System.out.println("-----PostConstruct.run() end");
            }
        },0,10, TimeUnit.MILLISECONDS);
    }


    /**
     * 根据序列号查询
     * @param reqeusts
     * @return
     */
    public Map<String,User> selectBatch(List<Request> reqeusts) {
        Map<String,User> serialNo_user_map = new HashMap<>();
        List<Integer> ids = new ArrayList<>();
        Map<Integer,List<String>> idSerialNosMap = new HashMap<>();
        for (Request request : reqeusts) {
            request.getSerialNo();
            ids.add(request.getId());
            if(!idSerialNosMap.containsKey(request.getId())) {
                List<String> serialNoList = new ArrayList<>();
                serialNoList.add(request.getSerialNo());
                idSerialNosMap.put(request.getId(),serialNoList);
            } else {
                idSerialNosMap.get(request.getId()).add(request.getSerialNo());
            }
        }

        List<User> users = userMapper.selectList(ids);
        System.out.println("----本次查询共获取:" + users.size() + "个结果");
        for(User user : users) {
            List<String> serialNoList = idSerialNosMap.get(user.getId());
            for(String serialNo : serialNoList) {
                serialNo_user_map.put(serialNo,user);
            }
        }
        return serialNo_user_map;
    }

    @Override
    public User selectByPrimaryKey(Integer id){
        Request request = new Request(UUID.randomUUID().toString(),id,new CompletableFuture<User>());
        queue.add(request);
        User user = null;
        try{
            user = request.future.get();
        } catch ( InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        return user;
//        return userMapper.selectByPrimaryKey(id);
    }

相关文章

  • 线程安全10 - 高并发请求 缩减selectOne --> s

    1. 根据code、clinicId 两个参数查询版本 2. 一个参数版本:

  • 并发—高并发

    并发:多个线程操作相同的资源,保证线程安全,合理使用资源高并发:服务能同时处理很多请求,提高程序性能 线程安全性 ...

  • Java并发基础

    并发:多个线程操作相同的资源,保证线程安全,合理使用资源 高并发:服务能同时处理很多请求,提高程序性能 (1230...

  • 高并发Java

    高并发Java(1):前言 高并发Java(2):多线程基础 高并发Java(3):Java内存模型和线程安全 高...

  • 并发编程Ⅰ 理论基础

    基本概念 1.并发:多个线程操作相同的资源,保证线程安全,合理利用资源 2.高并发:服务能同时处理很多请求,提高程...

  • 第一章

    Java并发编程与高并发解决方案知识点:线程安全;线程封闭;线程调度;同步容器;并发容器;AQS;J.UC 高并发...

  • java并发学习

    java 高并发 基本概念 并发: 多个线程操作相同的资源,保证线程安全,合理使用资源 高并发: 服务能同时处理很...

  • java其他类StringBuffer与StringBuilde

    1、StringBuffer 线程安全,效率低,用于并发 StringBuilder 运行更快,效率高,无线程安全...

  • 高并发与多线程

    高并发≠多线程,高并发是指系统短时间内遇到大量操作请求的情况,而多线程是指一种处理方式。 1. 实现高并发需要考虑...

  • 3、Java并发编程入门与高并发面试-并发编程与线程安全

    慕课网 Jimin老师 Java并发编程入门与高并发面试 学习笔记Java并发编程入门与高并发面试 线程安全: ...

网友评论

    本文标题:线程安全10 - 高并发请求 缩减selectOne --> s

    本文链接:https://www.haomeiwen.com/subject/ibfmnctx.html