美文网首页
java.util.concurrent 包下工具类的使用

java.util.concurrent 包下工具类的使用

作者: zhzhgang | 来源:发表于2018-04-01 20:56 被阅读0次

    CountDownLacth 的使用

    常用于监听某些初始化操作,等初始化执行完毕,通知主线程继续执行。
    先看示例代码:

    public class UseCountDownLatch {
    
        public static void main(String[] args) {
            
            final CountDownLatch countDown = new CountDownLatch(2);
            
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("进入线程t1" + "等待其他线程处理完成...");
                        countDown.await();
                        System.out.println("t1线程继续执行...");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"t1");
            
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("t2线程进行初始化操作...");
                        Thread.sleep(3000);
                        System.out.println("t2线程初始化完毕,通知t1线程继续...");
                        countDown.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            Thread t3 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("t3线程进行初始化操作...");
                        Thread.sleep(4000);
                        System.out.println("t3线程初始化完毕,通知t1线程继续...");
                        countDown.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            
            t1.start();
            t2.start();
            t3.start();
        }
    }
    

    运行结果:

    进入线程t1等待其他线程处理完成...
    t2线程进行初始化操作...
    t3线程进行初始化操作...
    t2线程初始化完毕,通知t1线程继续...
    t3线程初始化完毕,通知t1线程继续...
    t1线程继续执行...
    

    CyclicBarrier

    假设有一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才一起出发,只要有一个没有准备好,大家一起等待。

    示例代码:

    public class UseCyclicBarrier {
    
        static class Runner implements Runnable {  
            private CyclicBarrier barrier;  
            private String name;  
            
            public Runner(CyclicBarrier barrier, String name) {  
                this.barrier = barrier;  
                this.name = name;  
            }  
            @Override  
            public void run() {  
                try {  
                    Thread.sleep(1000 * (new Random()).nextInt(5));  
                    System.out.println(name + " 准备OK.");  
                    barrier.await();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                } catch (BrokenBarrierException e) {  
                    e.printStackTrace();  
                }  
                System.out.println(name + " Go!!");  
            }  
        } 
        
        public static void main(String[] args) throws IOException, InterruptedException {  
            CyclicBarrier barrier = new CyclicBarrier(3);  // 3 
            ExecutorService executor = Executors.newFixedThreadPool(3);  
            
            executor.submit(new Thread(new Runner(barrier, "zhangsan")));  
            executor.submit(new Thread(new Runner(barrier, "lisi")));  
            executor.submit(new Thread(new Runner(barrier, "wangwu")));  
      
            executor.shutdown();  
        }  
      
    }
    

    运行结果示例:

    lisi 准备OK.
    zhangsan 准备OK.
    wangwu 准备OK.
    wangwu Go!!
    lisi Go!!
    zhangsan Go!!
    

    Callable 和 Future

    Future 模式非常适合在处理很耗时很长的业务逻辑中使用,可以有效地减少系统的响应时间,提高系统的吞吐量。

    示例代码:

    public class UseFuture implements Callable<String>{
        private String para;
        
        public UseFuture(String para){
            this.para = para;
        }
        
        /**
         * 这里是真实的业务逻辑,其执行可能很慢
         */
        @Override
        public String call() throws Exception {
            //模拟执行耗时
            Thread.sleep(5000);
            String result = this.para + "处理完成";
            return result;
        }
        
        //主控制函数
        public static void main(String[] args) throws Exception {
            String queryStr = "query";
            //构造FutureTask,并且传入需要真正进行业务逻辑处理的类,该类一定是实现了Callable接口的类
            FutureTask<String> future = new FutureTask<String>(new UseFuture(queryStr));
            
            FutureTask<String> future2 = new FutureTask<String>(new UseFuture(queryStr));
            //创建一个固定线程的线程池且线程数为1,
            ExecutorService executor = Executors.newFixedThreadPool(2);
            //这里提交任务future,则开启线程执行RealData的call()方法执行
            //submit和execute的区别: 第一点是submit可以传入实现Callable接口的实例对象, 第二点是submit方法有返回值
            
            Future f1 = executor.submit(future);        //单独启动一个线程去执行的
            Future f2 = executor.submit(future2);
            System.out.println("请求完毕");
            
            try {
                //这里可以做额外的数据操作,也就是主程序执行其他业务逻辑
                System.out.println("处理实际的业务逻辑...");
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            //调用获取数据方法,如果call()方法没有执行完成,则依然会进行等待
            System.out.println("数据:" + future.get());
            System.out.println("数据:" + future2.get());
            
            executor.shutdown();
        }
    
    }
    

    运行结果:

    请求完毕
    处理实际的业务逻辑...
    数据:query处理完成
    数据:query处理完成
    

    Semaphore

    Semaphore 信号量可以控制系统的流量,拿到信号量的线程可以进入,否则就等待。通过 acquire() 和 release() 获取和释放访问许可。

    示例代码:

    public class UseSemaphore {  
      
        public static void main(String[] args) {  
            // 线程池  
            ExecutorService exec = Executors.newCachedThreadPool();  
            // 只能5个线程同时访问  
            final Semaphore semp = new Semaphore(5);  
            // 模拟20个客户端访问  
            for (int index = 0; index < 20; index++) {  
                final int NO = index;  
                Runnable run = new Runnable() {  
                    public void run() {  
                        try {  
                            // 获取许可  
                            semp.acquire();  
                            System.out.println("Accessing: " + NO);  
                            //模拟实际业务逻辑
                            Thread.sleep((long) (Math.random() * 10000));  
                            // 访问完后,释放  
                            semp.release();  
                        } catch (InterruptedException e) {  
                        }  
                    }  
                };  
                exec.execute(run);  
            } 
            
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            //System.out.println(semp.getQueueLength());
           
            // 退出线程池  
            exec.shutdown();  
        }  
      
    }  
    

    运行结果:

    Accessing: 0
    Accessing: 4
    Accessing: 3
    Accessing: 2
    Accessing: 1
    Accessing: 5
    Accessing: 6
    Accessing: 7
    Accessing: 8
    Accessing: 9
    Accessing: 10
    Accessing: 11
    Accessing: 12
    Accessing: 13
    Accessing: 14
    Accessing: 15
    Accessing: 16
    Accessing: 17
    Accessing: 18
    Accessing: 19
    

    ReentrantLock

    重入锁,在需要进行同步的代码部分加上锁定,但不要忘记最后一定要释放锁定,不然会造成锁永远无法释放,其他线程永远进不来的结果。

    示例代码:

    public class UseReentrantLock {
        
        private Lock lock = new ReentrantLock();
        
        public void method1(){
            try {
                lock.lock();
                System.out.println("当前线程:" + Thread.currentThread().getName() + "进入method1..");
                Thread.sleep(1000);
                System.out.println("当前线程:" + Thread.currentThread().getName() + "退出method1..");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                
                lock.unlock();
            }
        }
        
        public void method2(){
            try {
                lock.lock();
                System.out.println("当前线程:" + Thread.currentThread().getName() + "进入method2..");
                Thread.sleep(2000);
                System.out.println("当前线程:" + Thread.currentThread().getName() + "退出method2..");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                
                lock.unlock();
            }
        }
        
        public static void main(String[] args) {
    
            final UseReentrantLock ur = new UseReentrantLock();
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    ur.method1();
                    ur.method2();
                }
            }, "t1");
    
            t1.start();
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //System.out.println(ur.lock.getQueueLength());
        }
        
        
    }
    

    运行结果:

    当前线程:t1进入method1..
    当前线程:t1退出method1..
    当前线程:t1进入method2..
    当前线程:t1退出method2..
    

    在使用 synchronized 时,如果需要在多线程间进行协作工作,则需要 Object 类的 wait() 和 notify()、notifyAll() 方法进行配合工作。

    同样,在使用 Lock 的时候,可以使用一个新的等待 / 通知的类,它就是 Condition,Condition 一定是针对具体某一把锁的,也就是只有在锁的基础上才会产生 Condition。

    示例代码:

    public class UseCondition {
    
        private Lock lock = new ReentrantLock();
        private Condition condition = lock.newCondition();
        
        public void method1(){
            try {
                lock.lock();
                System.out.println("当前线程:" + Thread.currentThread().getName() + "进入等待状态..");
                Thread.sleep(3000);
                System.out.println("当前线程:" + Thread.currentThread().getName() + "释放锁..");
                condition.await();  // Object wait
                System.out.println("当前线程:" + Thread.currentThread().getName() +"继续执行...");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public void method2(){
            try {
                lock.lock();
                System.out.println("当前线程:" + Thread.currentThread().getName() + "进入..");
                Thread.sleep(3000);
                System.out.println("当前线程:" + Thread.currentThread().getName() + "发出唤醒..");
                condition.signal();     //Object notify
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public static void main(String[] args) {
            
            final UseCondition uc = new UseCondition();
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    uc.method1();
                }
            }, "t1");
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    uc.method2();
                }
            }, "t2");
            t1.start();
    
            t2.start();
        }
        
        
        
    }
    

    运行结果:

    当前线程:t1进入等待状态..
    当前线程:t1释放锁..
    当前线程:t2进入..
    当前线程:t2发出唤醒..
    当前线程:t1继续执行...
    

    可以通过一个 Lock 对象产生多个 Condition 进行多线程间的交互,非常灵活,可以使得部分需要唤醒的线程唤醒,其他线程则继续等待通知。

    示例代码:

    public class UseManyCondition {
    
        private ReentrantLock lock = new ReentrantLock();
        private Condition c1 = lock.newCondition();
        private Condition c2 = lock.newCondition();
        
        public void m1(){
            try {
                lock.lock();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m1等待..");
                c1.await();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m1继续..");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public void m2(){
            try {
                lock.lock();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m2等待..");
                c1.await();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m2继续..");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public void m3(){
            try {
                lock.lock();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m3等待..");
                c2.await();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m3继续..");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public void m4(){
            try {
                lock.lock();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒..");
                c1.signalAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public void m5(){
            try {
                lock.lock();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒..");
                c2.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public static void main(String[] args) {
            
            
            final UseManyCondition umc = new UseManyCondition();
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    umc.m1();
                }
            },"t1");
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    umc.m2();
                }
            },"t2");
            Thread t3 = new Thread(new Runnable() {
                @Override
                public void run() {
                    umc.m3();
                }
            },"t3");
            Thread t4 = new Thread(new Runnable() {
                @Override
                public void run() {
                    umc.m4();
                }
            },"t4");
            Thread t5 = new Thread(new Runnable() {
                @Override
                public void run() {
                    umc.m5();
                }
            },"t5");
            
            t1.start(); // c1
            t2.start(); // c1
            t3.start(); // c2
            
    
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            t4.start(); // c1
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            t5.start(); // c2
            
        }
    }
    

    运行结果:

    当前线程:t1进入方法m1等待..
    当前线程:t2进入方法m2等待..
    当前线程:t3进入方法m3等待..
    当前线程:t4唤醒..
    当前线程:t1方法m1继续..
    当前线程:t2方法m2继续..
    当前线程:t5唤醒..
    当前线程:t3方法m3继续..
    

    ReentrantReadWriteLock

    读写锁,其核心就是实现读写分离的锁。在高并发访问下,尤其是读多写少的情况下,性能要远高于重入锁。读读共享,写写互斥,读写互斥。

    示例代码:

    public class UseReentrantReadWriteLock {
    
        private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
        private ReadLock readLock = rwLock.readLock();
        private WriteLock writeLock = rwLock.writeLock();
        
        public void read(){
            try {
                readLock.lock();
                System.out.println("当前线程:" + Thread.currentThread().getName() + "进入...");
                Thread.sleep(3000);
                System.out.println("当前线程:" + Thread.currentThread().getName() + "退出...");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                readLock.unlock();
            }
        }
        
        public void write(){
            try {
                writeLock.lock();
                System.out.println("当前线程:" + Thread.currentThread().getName() + "进入...");
                Thread.sleep(3000);
                System.out.println("当前线程:" + Thread.currentThread().getName() + "退出...");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                writeLock.unlock();
            }
        }
        
        public static void main(String[] args) {
            
            final UseReentrantReadWriteLock urrw = new UseReentrantReadWriteLock();
            
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    urrw.read();
                }
            }, "t1");
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    urrw.read();
                }
            }, "t2");
            Thread t3 = new Thread(new Runnable() {
                @Override
                public void run() {
                    urrw.write();
                }
            }, "t3");
            Thread t4 = new Thread(new Runnable() {
                @Override
                public void run() {
                    urrw.write();
                }
            }, "t4");       
            
    //      t1.start();
    //      t2.start();
            
    //      t1.start(); // R 
    //      t3.start(); // W
            
            t3.start();
            t4.start(); 
        }
    }
    

    运行结果:

    当前线程:t3进入...
    当前线程:t3退出...
    当前线程:t4进入...
    当前线程:t4退出...
    

    相关文章

      网友评论

          本文标题:java.util.concurrent 包下工具类的使用

          本文链接:https://www.haomeiwen.com/subject/xpsocftx.html