Java 并发编程进阶

作者: FlySheep_ly | 来源:发表于2017-03-31 08:59 被阅读275次

    一、Executor 框架

    为了更好的控制多线程,JDK 提供了一套线程框架 Executor,帮助开发人员有效地进行线程控制。它们都在 java.util.concurrent 包中,是 JDK 并发包的核心。其中有一个比较重要的类:Executors,它扮演着线程工厂的角色,我们通过 Executors 可以创建特定功能的线程池。
      Executors 创建线程池的方法:

    • newFixedThreadPool() 方法,该方法返回一个固定数量的线程池,该方法的线程数始终不变,当有一个任务提交时,若线程池中空闲,则立即执行,若没有,则会被暂缓在一个任务队列中等待有空闲的线程去执行。
    • newSingleThreadExecutor() 方法,创建只有一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。
    • newCachedThreadPool() 方法,返回一个可根据实际情况调整线程个数的线程池,不限制最大线程数量,若有空闲的线程则执行任务,若无任务则不创建线程。并且每一个空闲线程会在 60 秒后自动回收。
    • newScheduledThreadPool() 方法,该方法返回一个 ScheduledExecutorService 对象,但该线程池可以指定线程的数量。

    二、自定义线程池

    若 Executors 工厂类无法满足我们的需求,可以自己去创建自定义的线程池,其实 Executors 工厂类里面的创建线程方法其内部实现均是用了 ThreadPoolExecutor 这个类,这个类可以自定义线程。构造方法如下:

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    这个构造方法对于队列是什么类型的比较关键:
      在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于 corePoolSize,则优先创建线程,若大于 corePoolSize,则会将任务加入队列,若队列已满,则在总线程数不大于 maximumPoolSize 的前提下,创建新的线程,若线程数大于 maximumPoolSize,则执行拒绝策略。或其它自定义方式。
      在使用无界的任务队列时:LinkedBlockQueue。与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。当有新任务到来,系统的线程数小于 corePoolSize 时,则新建线程执行任务。当达到 corePoolSize 后,就不会继续增加。若后续仍有新的任务加入,而又没有空闲的线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。
      JDK 拒绝策略:
      AbortPolicy:直接抛出异常组织系统正常工作。
      CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。
      DiscardOldestPolicy:丢弃最老的一个请求,尝试再次提交当前任务。
      DisardPolicy:丢弃无法处理的任务,不给予任何处理。
      如果需要自定义拒绝策略可以实现 RejectedExecutionHandler 接口。

    三、Concurrent.util 常用类

    1. CyclicBarrier 使用

    假设有这样一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才一起出发,只要有一个人没有准备好,大家都等待。

    import java.io.IOException;  
    import java.util.Random;  
    import java.util.concurrent.BrokenBarrierException;  
    import java.util.concurrent.CyclicBarrier;  
    import java.util.concurrent.ExecutorService;  
    import java.util.concurrent.Executors; 
    public class UseCyclicBarrier {
    
        static class Runner implements Runnable {  
            private CyclicBarrier barrier;  
            private String name;  
            
            public Runner(CyclicBarrier barrier, String name) {  
                this.barrier = barrier;  
                this.name = name;  
            }  
            @Override  
            public void run() {  
                try {  
                    Thread.sleep(1000 * (new Random()).nextInt(5));  
                    System.out.println(name + " 准备OK.");  
                    barrier.await();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                } catch (BrokenBarrierException e) {  
                    e.printStackTrace();  
                }  
                System.out.println(name + " Go!!");  
            }  
        } 
        
        public static void main(String[] args) throws IOException, InterruptedException {  
            CyclicBarrier barrier = new CyclicBarrier(3);  // 3 
            ExecutorService executor = Executors.newFixedThreadPool(3);  
            
            executor.submit(new Thread(new Runner(barrier, "zhangsan")));  
            executor.submit(new Thread(new Runner(barrier, "lisi")));  
            executor.submit(new Thread(new Runner(barrier, "wangwu")));  
      
            executor.shutdown();  
        }  
      
    }  
    

    2. CountDownLacth 使用

    它经常用于监听某些初始化动作,等初始化执行完毕后,通知主线程继续工作。

    public class UseCountDownLatch {
    
        public static void main(String[] args) {
            
            final CountDownLatch countDown = new CountDownLatch(2);
            
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("进入线程t1" + "等待其他线程处理完成...");
                        countDown.await();
                        System.out.println("t1线程继续执行...");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"t1");
            
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("t2线程进行初始化操作...");
                        Thread.sleep(3000);
                        System.out.println("t2线程初始化完毕,通知t1线程继续...");
                        countDown.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            Thread t3 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("t3线程进行初始化操作...");
                        Thread.sleep(4000);
                        System.out.println("t3线程初始化完毕,通知t1线程继续...");
                        countDown.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            
            t1.start();
            t2.start();
            t3.start();
        }
    }
    

    3. Callable 和 Future 使用

    Future 模式非常适合在处理很耗时很长的业务逻辑时进行使用,可以有效的减少系统的响应时间,提高系统的吞吐量。

    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.FutureTask;
    
    public class UseFuture implements Callable<String>{
        private String para;
        
        public UseFuture(String para){
            this.para = para;
        }
        
        /**
         * 这里是真实的业务逻辑,其执行可能很慢
         */
        @Override
        public String call() throws Exception {
            //模拟执行耗时
            Thread.sleep(5000);
            String result = this.para + "处理完成";
            return result;
        }
        
        //主控制函数
        public static void main(String[] args) throws Exception {
            String queryStr = "query";
            //构造FutureTask,并且传入需要真正进行业务逻辑处理的类,该类一定是实现了Callable接口的类
            FutureTask<String> future = new FutureTask<String>(new UseFuture(queryStr));
            
            FutureTask<String> future2 = new FutureTask<String>(new UseFuture(queryStr));
            //创建一个固定线程的线程池且线程数为1,
            ExecutorService executor = Executors.newFixedThreadPool(2);
            //这里提交任务future,则开启线程执行RealData的call()方法执行
            //submit和execute的区别: 第一点是submit可以传入实现Callable接口的实例对象, 第二点是submit方法有返回值
            
            Future f1 = executor.submit(future);        //单独启动一个线程去执行的
            Future f2 = executor.submit(future2);
            System.out.println("请求完毕");
            
            try {
                //这里可以做额外的数据操作,也就是主程序执行其他业务逻辑
                System.out.println("处理实际的业务逻辑...");
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            //调用获取数据方法,如果call()方法没有执行完成,则依然会进行等待
            System.out.println("数据:" + future.get());
            System.out.println("数据:" + future2.get());
            
            executor.shutdown();
        }
    
    }
    

    4. 信号量

    Semaphore 信号量非常适合高并发访问,新系统在上线之前,要对系统的访问量进行评估,当然这个值肯定不是随便拍拍脑袋就能想出来的,是经过以往的经验、数据、历年的访问量以及推广力度进行一个合理的评估,当然评估标准不能太大也不能太小,太大的话投入的资源达不到实际效果,纯粹浪费资源,太小的话,某个时间点一个高峰值的访问量上来直接可以压垮系统。
      相关概念:
      PV:(Page View)网站的总访问量,页面浏览量或点击量,用户每刷新一次就会被记录一次。
      UV:(Unique Visitor)访问网站的一台电脑客户端为一个访客。一般来讲,时间上以 00:00 - 24:00 之内相同 IP 的客户端只记录一次。
      QPS:(Query Per Second)即每秒查询数,qps 很大程度上代表了系统业务上的繁忙程度,每次请求的背后,可能对应着多次磁盘 I/O,多次网络请求,多个 CPU 时间片等。我们通过 qps 可以非常直观的了解当前系统业务情况,一旦当前 qps 超过所设定的预警阈值,可以考虑增加机器对集群扩容,以免压力过大导致宕机,可以根据前期的压力测试得到估值,在结合后期综合运维情况,估算出阈值。
      RT:(Response Time)即请求的响应时间,这个指标非常关键,直接说明前端用户的体验,因此任何系统设计师都想降低 rt 时间。
      当然还涉及 CPU、内存、网络、磁盘等情况,更细节的问题很多,如 select、update、delete/ps 等数据库层面的统计。
      容量评估:一般来说通过开发、运维、测试、以及业务等相关人员,综合出系统的一系列阈值,然后我们根据关键阈值如 QPS、rt 等,对系统进行有效的变更。
      一般来讲,我们进行多轮压力测试后,可以对系统进行峰值评估,采用所谓的 80/20 原则,即 80% 的访问请求将在 20% 的时间内达到。这样我们可以根据系统对应的 PV 计算出峰值 QPS。
      峰值 QPS = (总PV ✖️ 80%)/(60 ✖️ 60 ✖️ 24 ✖️ 20%)
      然后将总的峰值 QPS 除以单台机器所能承受的最高的 QPS 值,就是所需要机器的数量:机器数 = 总的峰值 QPS / 压测得出的单机极限 QPS
      当然不排除系统在上线前进行大型促销活动,或者双十一、双十二热点事件、遭受到 DDoS 攻击等情况,系统的开发和运维人员急需要了解当前系统运行的状态和负载情况,一般都会有后台系统去维护。
      Semaphore 可以控制系统的流量:拿到信号量的线程可以进入,否则就等待。通过 acruire() 和 release() 获取和释放访问许可。

    import java.util.concurrent.ExecutorService;  
    import java.util.concurrent.Executors;  
    import java.util.concurrent.Semaphore;  
      
    public class UseSemaphore {  
      
        public static void main(String[] args) {  
            // 线程池  
            ExecutorService exec = Executors.newCachedThreadPool();  
            // 只能5个线程同时访问  
            final Semaphore semp = new Semaphore(5);  
            // 模拟20个客户端访问  
            for (int index = 0; index < 20; index++) {  
                final int NO = index;  
                Runnable run = new Runnable() {  
                    public void run() {  
                        try {  
                            // 获取许可  
                            semp.acquire();  
                            System.out.println("Accessing: " + NO);  
                            //模拟实际业务逻辑
                            Thread.sleep((long) (Math.random() * 10000));  
                            // 访问完后,释放  
                            semp.release();  
                        } catch (InterruptedException e) {  
                        }  
                    }  
                };  
                exec.execute(run);  
            } 
            
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            //System.out.println(semp.getQueueLength());
            
            
            
            // 退出线程池  
            exec.shutdown();  
        }  
      
    }  
    

    四、锁

    在 Java 多线程中,我们知道可以使用 synchronized 关键字来实现线程间的同步互斥工作,那么其实还有一个更优秀的机制去完成这个“同步互斥”工作,它就是 Lock 对象,这里主要介绍两种锁:重入锁和读写锁。它们具有比 synchronized 更为强大的功能,并且有嗅探锁定、多路分支等功能。

    1. ReentrantLock(重入锁)

    重入锁在需要进行同步的代码部分加上锁定,但不要忘记最后一定要释放锁定,不然会造成锁永远无法释放,其它线程永远进不来的结果。

    import java.util.concurrent.CopyOnWriteArrayList;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class UseReentrantLock {
        
        private Lock lock = new ReentrantLock();
        
        public void method1(){
            try {
                lock.lock();
                System.out.println("当前线程:" + Thread.currentThread().getName() + "进入method1..");
                Thread.sleep(1000);
                System.out.println("当前线程:" + Thread.currentThread().getName() + "退出method1..");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                
                lock.unlock();
            }
        }
        
        public void method2(){
            try {
                lock.lock();
                System.out.println("当前线程:" + Thread.currentThread().getName() + "进入method2..");
                Thread.sleep(2000);
                System.out.println("当前线程:" + Thread.currentThread().getName() + "退出method2..");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                
                lock.unlock();
            }
        }
        
        public static void main(String[] args) {
    
            final UseReentrantLock ur = new UseReentrantLock();
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    ur.method1();
                    ur.method2();
                }
            }, "t1");
    
            t1.start();
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //System.out.println(ur.lock.getQueueLength());
        }
    }
    

    2. 锁与等待/通知

    在使用 synchronized 的时候,如果需要多线程间进行协作工作则需要 Object 的 wait() 和 notify()、notifyAll() 方法进行配合工作。
      那么同样,在使用 Lock 的时候,可以使用一个新的等待/通知的类,它就是 Condition 。这个 Condition 一定是针对具体某一把锁的。也就是在只有锁的基础之上才会产生 Condition。

    import java.util.concurrent.CopyOnWriteArrayList;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class UseCondition {
    
        private Lock lock = new ReentrantLock();
        private Condition condition = lock.newCondition();
        
        public void method1(){
            try {
                lock.lock();
                System.out.println("当前线程:" + Thread.currentThread().getName() + "进入等待状态..");
                Thread.sleep(3000);
                System.out.println("当前线程:" + Thread.currentThread().getName() + "释放锁..");
                condition.await();  // Object wait
                System.out.println("当前线程:" + Thread.currentThread().getName() +"继续执行...");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public void method2(){
            try {
                lock.lock();
                System.out.println("当前线程:" + Thread.currentThread().getName() + "进入..");
                Thread.sleep(3000);
                System.out.println("当前线程:" + Thread.currentThread().getName() + "发出唤醒..");
                condition.signal();     //Object notify
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public static void main(String[] args) {
            
            final UseCondition uc = new UseCondition();
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    uc.method1();
                }
            }, "t1");
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    uc.method2();
                }
            }, "t2");
            t1.start();
    
            t2.start();
        }
    }
    

    3. 多 Condition

    我们可以通过一个 Lock 对象产生多个 Condition 进行多线程间的交互,非常的灵活。可以使得部分需要唤醒的线程唤醒,其它线程则继续等待通知。

    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class UseManyCondition {
    
        private ReentrantLock lock = new ReentrantLock();
        private Condition c1 = lock.newCondition();
        private Condition c2 = lock.newCondition();
        
        public void m1(){
            try {
                lock.lock();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m1等待..");
                c1.await();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m1继续..");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public void m2(){
            try {
                lock.lock();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m2等待..");
                c1.await();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m2继续..");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public void m3(){
            try {
                lock.lock();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m3等待..");
                c2.await();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m3继续..");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public void m4(){
            try {
                lock.lock();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒..");
                c1.signalAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public void m5(){
            try {
                lock.lock();
                System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒..");
                c2.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public static void main(String[] args) {
            
            
            final UseManyCondition umc = new UseManyCondition();
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    umc.m1();
                }
            },"t1");
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    umc.m2();
                }
            },"t2");
            Thread t3 = new Thread(new Runnable() {
                @Override
                public void run() {
                    umc.m3();
                }
            },"t3");
            Thread t4 = new Thread(new Runnable() {
                @Override
                public void run() {
                    umc.m4();
                }
            },"t4");
            Thread t5 = new Thread(new Runnable() {
                @Override
                public void run() {
                    umc.m5();
                }
            },"t5");
            
            t1.start(); // c1
            t2.start(); // c1
            t3.start(); // c2
            
    
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            t4.start(); // c1
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            t5.start(); // c2
            
        }
    }
    

    4. Lock / Condition 其它方法和用法

    公平锁和非公平锁:
    Lock lock = new ReentrantLock(boolean isFair);
    
    lock 用法:
    tryLock():尝试获得锁,获得结果用 true/false 返回。
    tryLock():在给定时间内尝试获得锁,获得结果用 true/false 返回。
    isFair():是否是公平锁。
    isLocked():是否锁定。
    getHoldCount():查询当前线程保持此锁的个数,也就是调用 lock() 次数。
    lockInterruptibly():优先响应中断锁。
    getQueueLength():返回正在等待获取此锁定的线程数。
    getWaitQueueLength():返回等待与锁定相关的给定条件 Condition 的线程数。
    hasQueueThread(Thread thread):查询指定的线程是否正在等待此锁。
    hasQueueThreads():查询是否有线程正在等待此锁。
    hasWaiters():查询是否有线程正在等待与此锁定有关的condition 条件。
    

    5. ReentrantReadWriteLock(读写锁)

    读写锁 ReentrantReadWriteLock,其核心就是实现读写分离的锁。在高并发访问下,尤其是读多写少的情况下,性能要远高于重入锁。
      使用 synchronized、ReentrantLock时,同一时间内,只能有一个线程进行访问被锁定的代码,那么读写锁则不同,其本质是分成两个锁,即读锁、写锁。在读锁下,多个线程可以并发的进行访问,但是在写锁的时候,只能一个一个的顺序访问。
      口诀:读读共享,写写互斥,读写互斥

    import java.util.concurrent.locks.ReentrantReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
    
    public class UseReentrantReadWriteLock {
    
        private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
        private ReadLock readLock = rwLock.readLock();
        private WriteLock writeLock = rwLock.writeLock();
        
        public void read(){
            try {
                readLock.lock();
                System.out.println("当前线程:" + Thread.currentThread().getName() + "进入...");
                Thread.sleep(3000);
                System.out.println("当前线程:" + Thread.currentThread().getName() + "退出...");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                readLock.unlock();
            }
        }
        
        public void write(){
            try {
                writeLock.lock();
                System.out.println("当前线程:" + Thread.currentThread().getName() + "进入...");
                Thread.sleep(3000);
                System.out.println("当前线程:" + Thread.currentThread().getName() + "退出...");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                writeLock.unlock();
            }
        }
        
        public static void main(String[] args) {
            
            final UseReentrantReadWriteLock urrw = new UseReentrantReadWriteLock();
            
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    urrw.read();
                }
            }, "t1");
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    urrw.read();
                }
            }, "t2");
            Thread t3 = new Thread(new Runnable() {
                @Override
                public void run() {
                    urrw.write();
                }
            }, "t3");
            Thread t4 = new Thread(new Runnable() {
                @Override
                public void run() {
                    urrw.write();
                }
            }, "t4");       
            
    //      t1.start();
    //      t2.start();
            
    //      t1.start(); // R 
    //      t3.start(); // W
            
            t3.start();
            t4.start();
        }
    }
    

    6. 锁的优化

    • 避免死锁
    • 减少锁的持有时间
    • 减少锁的粒度
    • 锁的分离
    • 尽量使用无锁的操作,如原子操作(Atomic 系列类),volatile 关键字。

    相关文章

      网友评论

        本文标题:Java 并发编程进阶

        本文链接:https://www.haomeiwen.com/subject/khqeottx.html