美文网首页JUC并发包源码实战
J.U.C并发包之 FutrueTask

J.U.C并发包之 FutrueTask

作者: 语落心生 | 来源:发表于2019-07-08 11:43 被阅读0次

    最近在公司代码中看到使用过J.U.C并发包。决定先更新这部分的内容,等待有空再续写jvm和redis专题的文章

    FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果

    来看到源码中构造方法的解释

     /**
         * Creates a {@code FutureTask} that will, upon running, execute the
         * given {@code Runnable}, and arrange that {@code get} will return the
         * given result on successful completion.
         *
         * @param runnable the runnable task
         * @param result the result to return on successful completion. If
         * you don't need a particular result, consider using
         * constructions of the form:
         * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
         * @throws NullPointerException if the runnable is null
         */
        public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
        }
    

    此类的目的是作为一个异步任务,将启动一个线程去执行任务,并在其执行成功后获取结果

    FutureTask执行多任务计算的使用场景

    我们先来看一下线程池的结构

    theadPool.jpg

    每一个线程池都存在一个阻塞队列,这里的队列是用来保存线程提交的任务的。而FutrueTask将提交任务和计算任务分为两部分,主线程负责提交需要执行的任务到阻塞队列,而子线程负责从阻塞队列中取出任务进行计算。这样进行异步任务可以大大减少操作耗时

    源码中的run方法

    public void run() {
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    
    

    demo演示

    package com.corrent.demo.web;
    
    import org.apache.catalina.LifecycleState;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;
    
    /**
     * @Author complone
     * @Description  异步执行子任务
     * @Date 11:34 2019/7/8
     * @Param 
     * @return 
     **/
    public class DemoTask {
    
    
    
        public static void main(String[] args){
            DemoTask inst=new DemoTask();
            // 创建任务集合
            List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();
            // 创建线程池
            ExecutorService exec = Executors.newFixedThreadPool(5);
            for (int i = 0; i < 10; i++) {
                // 传入Callable对象创建FutureTask对象
                FutureTask<Integer> ft = new FutureTask<Integer>(inst.new ComputeTask(i, ""+i));
                taskList.add(ft);
                // 提交给线程池执行任务,也可以通过exec.invokeAll(taskList)一次性提交所有任务;
                exec.submit(ft);
            }
    
            System.out.println("所有计算任务提交完毕, 主线程接着干其他事情!");
    
            // 开始统计各计算线程计算结果
            Integer totalResult = 0;
            for (FutureTask<Integer> ft : taskList) {
                try {
                    //FutureTask的get方法会自动阻塞,直到获取计算结果为止
                    totalResult = totalResult + ft.get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
    
            // 关闭线程池
            exec.shutdown();
            System.out.println("多任务计算后的总结果是:" + totalResult);
    
        }
    
        private class ComputeTask implements Callable<Integer> {
    
            private Integer result = 0;
            private String taskName = "";
    
            public ComputeTask(Integer iniResult, String taskName){
                result = iniResult;
                this.taskName = taskName;
                System.out.println("生成子线程计算任务: "+taskName);
            }
    
            public String getTaskName(){
                return this.taskName;
            }
    
            @Override
            public Integer call() throws Exception {
                // TODO Auto-generated method stub
    
                for (int i = 0; i < 100; i++) {
                    result =+ i;
                }
                // 休眠5秒钟,观察主线程行为,预期的结果是主线程会继续执行,到要取得FutureTask的结果是等待直至完成。
                Thread.sleep(5000);
                System.out.println("子线程计算任务: "+taskName+" 执行完成!");
                return result;
            }
        }
    }
    
    

    result:

    生成子线程计算任务: 0
    生成子线程计算任务: 1
    生成子线程计算任务: 2
    生成子线程计算任务: 3
    生成子线程计算任务: 4
    生成子线程计算任务: 5
    生成子线程计算任务: 6
    生成子线程计算任务: 7
    生成子线程计算任务: 8
    生成子线程计算任务: 9
    所有计算任务提交完毕, 主线程接着干其他事情!
    子线程计算任务: 1 执行完成!
    子线程计算任务: 0 执行完成!
    子线程计算任务: 2 执行完成!
    子线程计算任务: 3 执行完成!
    子线程计算任务: 4 执行完成!
    子线程计算任务: 5 执行完成!
    子线程计算任务: 6 执行完成!
    子线程计算任务: 8 执行完成!
    子线程计算任务: 7 执行完成!
    子线程计算任务: 9 执行完成!
    多任务计算后的总结果是:990
    

    FutureTask在高并发环境下确保任务只执行一次

    在很多高并发的环境下,往往我们只需要某些任务只执行一次。这种使用情景FutureTask的特性恰能胜任。举一个例子,假设有一个带key的连接池,当key存在时,即直接返回key对应的对象;当key不存在时,则创建连接。

    private Map<String, Connection> connectionPool = new HashMap<String, Connection>();  
    private ReentrantLock lock = new ReentrantLock();  
      
    public Connection getConnection(String key){  
        try{  
            lock.lock();  
            if(connectionPool.containsKey(key)){  
                return connectionPool.get(key);  
            }  
            else{  
                //创建 Connection  
                Connection conn = createConnection();  
                connectionPool.put(key, conn);  
                return conn;  
            }  
        }  
        finally{  
            lock.unlock();  
        }  
    }  
      
    //创建Connection  
    private Connection createConnection(){  
        return null;  
    }
    

    通过FutrueTask异步获取任务结果

    private ConcurrentHashMap<String,FutureTask<Connection>>connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>();  
      
    public Connection getConnection(String key) throws Exception{  
        FutureTask<Connection>connectionTask=connectionPool.get(key);  
        if(connectionTask!=null){  
            return connectionTask.get();  
        }  
        else{  
            Callable<Connection> callable = new Callable<Connection>(){  
                @Override  
                public Connection call() throws Exception {  
                    // TODO Auto-generated method stub  
                    return createConnection();  
                }  
            };  
            FutureTask<Connection>newTask = new FutureTask<Connection>(callable);  
            connectionTask = connectionPool.putIfAbsent(key, newTask);  
            if(connectionTask==null){  
                connectionTask = newTask;  
                connectionTask.run();  
            }  
            return connectionTask.get();  
        }  
    }  
      
    //创建Connection  
    private Connection createConnection(){  
        return null;  
    }
    

    而目前的业务面临这样的场景

    有多家药品机构,需要在组装查询条件后,即时显示查询后不同平台(公众端,服务端,运营端)的结果。
    我们可以把每一个查询命令用事务控制,保证每次提交的原子性。且在组装的时候把每一个查询任务交给主线程,查询到的结果交给子线程进行回调

     CopyOnWriteArrayList<FutureTask<BaseResult>> taskList = new CopyOnWriteArrayList<>();
            for (Map.Entry<String, Set<String>> m : thirdUrlMap.entrySet()) {
                Callable<BaseResult> baseResultCallable = new Callable<BaseResult>() {
                    @Override
                    public BaseResult call() throws Exception {
                        JSONObject paramObject = new JSONObject();
                        paramObject.put("orgDrugDetailQueryVoList",paramArray.toArray());
                        String url = m.getKey();
                        Set<String> orgCodeSet = m.getValue();
                        paramObject.put("orgCodes", orgCodeSet.toArray());
                        Map<String, String> headers = new HashMap<>();
                        headers.put("Content-Type", "application/json; charset=UTF-8");
                        BaseResult baseResult = HttpUtils.postJson(url,paramObject,headers,BaseResult.class);
                        return baseResult;
                    }
                };
                FutureTask<BaseResult> orgTask = new FutureTask<BaseResult>(baseResultCallable);
                taskList.add(orgTask);
                executorService.submit(orgTask);
            }
            taskList.forEach(task->{
                BaseResult baseResult = null;
                try {
                    baseResult = task.get();
                    logger.info("返回结果。。。"+JSONObject.toJSONString(
                            baseResult
                    ));
                }catch (Exception e){
                    logger.info("获取药店信息异常");
                }
                if (ZoeValidateUtils.isEmpty(baseResult)){
                    return;
                }
                int code = baseResult.getCode();
                if(ZoePublicConfig.ZEOR_INT.intValue() != code){
                    return;
                }
                JSONArray jsonArray = (JSONArray)baseResult.getData();
                int size = jsonArray.size();
                for(int i=0;i<size;i++) {
                    //获取机构药品信息
                    JSONObject jsonObject = jsonArray.getJSONObject(i);
                    OrgDrugInfoVo orgDrugInfoVo = new OrgDrugInfoVo();
                    orgDrugInfoVo.setOrgCode(jsonObject.getString("orgCode"));
                    logger.info("orgCode。。。" + jsonObject.getString("orgCode"));
    //                orgDrugInfoVo.setTotalPrice(jsonObject.getBigDecimal("totalPrice"));
                    orgDrugInfoVo.setTotalPrice(orgDrugVo.getTotalPrice());
                    logger.info("totalStockFlag。。。" + jsonObject.getBoolean("totalStockFlag"));
                    orgDrugInfoVo.setTotalStockFlag(jsonObject.getBoolean("totalStockFlag"));
                    orgDrugInfoVo.setPrescriptionId(orgDrugVo.getInterrogationId());
                    orgDrugInfoVo.setPrescriptionStatus(orgDrugVo.getPrescriptionStatus());
                    //药品订单详情直接从平台拿
                    if (!ZoeValidateUtils.isEmpty(drugOrderDetails)) {
                        orgDrugInfoVo.setDrugOrders(drugOrderDetails);
                        orgDrugInfoMap.put(orgDrugInfoVo.getOrgCode(), orgDrugInfoVo);
                        System.out.println("-------orgCode:" + orgDrugInfoVo.getOrgCode() + "------prescriptionStatus" + orgDrugInfoVo.getPrescriptionStatus());
                        continue;
                    }
    
                    if (ZoeValidateUtils.isEmpty(jsonObject.getJSONArray("orgDrugDetailVoList"))) {
                        orgDrugInfoMap.put(orgDrugInfoVo.getOrgCode(), orgDrugInfoVo);
                        continue;
                    }
                    JSONArray orgDrugArray = jsonObject.getJSONArray("orgDrugDetailVoList");
                    int orgDrugSize = orgDrugArray.size();
                    List<DrugOrderVo> drugOrders = new ArrayList<DrugOrderVo>();
                    for (int j = 0; j < orgDrugSize; j++) {
                        //设置每个药品信息
                        JSONObject orgDrugObject = orgDrugArray.getJSONObject(j);
                        DrugOrderVo drugOrderVo = new DrugOrderVo();
                        drugOrderVo.setDrugId(orgDrugObject.getString("drugNo"));
                        drugOrderVo.setDrugName(orgDrugObject.getString("drugName"));
                        drugOrderVo.setNum(new BigDecimal(orgDrugObject.getInteger("drugNum")));
                        drugOrderVo.setDrugPrice(orgDrugObject.getBigDecimal("drugPrice"));
                        drugOrderVo.setDrugTotalPrice(orgDrugObject.getBigDecimal("drugTotalPrice"));
                        drugOrderVo.setDrugUnit(orgDrugObject.getString("drugUnit"));
                        drugOrderVo.setSpecification(orgDrugObject.getString("specification"));
                        DrugOrderVo drug = drugMap.get(orgDrugObject.getString("drugNo"));
                        if (!ZoeValidateUtils.isEmpty(drug) && !ZoeValidateUtils.isEmpty(drug.getDrugPrice())) {
                            drugOrderVo.setDrugPrice(drug.getDrugPrice());
                            drugOrderVo.setDrugTotalPrice(drug.getDrugTotalPrice());
                        }
                        drugOrders.add(drugOrderVo);
                    }
                    orgDrugInfoVo.setDrugOrders(drugOrders);
                    orgDrugInfoMap.put(orgDrugInfoVo.getOrgCode(), orgDrugInfoVo);
                }
            });
    

    这样既保证了查询的异步,又因为在高并发下任务只执行一次,故在回调查询结果时可以对结果进行去重。保证结果唯一

    相关文章

      网友评论

        本文标题:J.U.C并发包之 FutrueTask

        本文链接:https://www.haomeiwen.com/subject/ctqohctx.html