美文网首页
Java高并发程序设计

Java高并发程序设计

作者: 都有米 | 来源:发表于2018-08-19 01:28 被阅读225次

    线程既可共享进程的资源(内存地址、文件I/O等),又可以独立调度(线程是CPU调度的最基本单位)。

    一、基本概念

    1. 同步(Synchronized)和异步(Asynchronized)

    同步:调用者必须等到方法调用返回后,才能继续后续操作;
    异步:方法调用会立刻返回,调用者可以继续后续操作。

    1. 并发(Concurrency)和并行(Parallelism)

    并发:多个任务交替执行,任务之间可能还是串行;
    并行:真正意义上的“同时执行”。单CPU只能并发,多核才能并行。

    1. 临界区

    可以被多个线程使用的公共资源,即共享数据。

    1. 阻塞(Blocking)和非阻塞(Non-Blocking)

    阻塞:线程A占用了临界区资源,其他需要这个资源的线程就会在临界区中等待,等待会导致线程挂起,这就是阻塞。
    非阻塞:线程不会妨碍其他线程的执行,所有线程都会尝试不断向前执行。

    1. 死锁(Deadlock)、饥饿(Starvation)和活锁(Livelock)

    死锁:彼此占有对方需要的资源,都不释放;
    饥饿:线程因种种原因无法获取所需资源,一直无法执行;
    活锁:彼此占有对方需要的资源,都主动释放,但是没有一个线程同时拿到所有资源而正常执行,出现资源不断在两个线程之间跳动。

    1. 并发级别【*】

    阻塞:关键字synchronized或者重入锁,就是阻塞线程。
    无饥饿:公平锁,就不会产生饥饿。
    无障碍:无障碍执行,检测到冲突就回滚操作。
    无锁:一个无穷循环直到修改成功。compareAndSet
    无等待:典型无等待结构RCU(Read-Copy-Update) 。

    1. Java的内存模型(JMM)
    1. 原子性(Atomicity):指一个操作不可中断。
    2. 可见性(Visibility):线程A修改共享变量后线程B是否能立刻知道。
    3. 有序性(Ordering):指令重排保证串行语义一致,但无法保证多线程间语义一致。

    Happen-Before规则(不重排指令):volatile变量写操作先与读操作、锁规则、顺序原则...

    1. 进程(Process)和线程(Thread)

    进程:系统进行资源分配和调度的基本单位。进程是线程的容器,进程是程序的实体。
    线程:轻量级进程,程序执行的最小单位。

    1. 线程相关概念

    线程组:
    守护线程(Daemon):垃圾回收线程、JIT线程
    线程优先级:优先级高的线程在竞争资源时更有优势。使用1-10表示线程优先级,数字越大,优先级越高。

    二、线程的状态

    线程状态转换关系

    线程在某个时间点只可能处于以下一种状态,这个状态是虚拟机线程状态,不是操作系统线程状态。

    • NEW :线程被创建,但是还没有调用start方法。
    • RUNNABLE:JVM正在执行的线程
    • BLOCKED:正在等待锁
    • WAITING:正在无限期等待其他线程执行某些操作的线程
    • TIMED_WAITING:正在等待其他线程执行某些操作,某个时间点会等待超时的线程
    • TERMINATED:已经退出执行的线程
        public enum State {
            /**
             * 还没有调用start方法启动的线程处于NEW状态
             */
            NEW,
    
            /** 
             * 1、JVM正在执行的线程;
             * 2、可执行但是未执行,因为系统资源,如CPU忙等原因,正在等待被操作系统调度执行的线程
             */
            RUNNABLE,
    
            /**
             * 正在等待锁资源,准备进入同步代码块的线程
             * 或者调用Object.wait后(其他线程已经notify了)准备重新进入同步代码块的线程
             */
            BLOCKED,
    
            /**
             * 因为调用Object.wait、Thread.join、LockSupport.park而导致线程处于等待状态
             * 处于等待状态的线程需要其他线程执行特定的操作才能被唤醒。
             * 如,调用Object.wait的线程需要其他线程调用Object.notify()或者Object.notifyAll()方法来唤醒;调用Thread.join() 的线程需要对应线程执行结束才能被唤醒。
             */
            WAITING,
    
            /**
             * 处于等待状态,但是有等待超时的线程。
             * 通常是调用了Thread.sleep(long)、Object.wait(long)、Thread.join(long)、LockSupport.parkNanos、LockSupport.parkUntil方法
             */
            TIMED_WAITING,
    
            /**
             * 已经执行完成的线程
             */
            TERMINATED;
        }
    

    三、线程的基本操作

    1. 创建线程:Thread和Runnable、run()和start()的区别;
    2. 终止线程

    • 不要使用stop()方法,可能导致共享数据异常。如线程A持有锁,写共享数据,写到一半被stop,线程A立刻结束并释放锁,这时线程B获得锁,读取的共享数据就是写了一半的异常数据。
    • 使用标志位停止线程:wait、sleep情况只能等待,无法及时停止
    • 使用线程中断:wait、sleep时被中断会抛出InterruptedException
    public void Thread.interrupt() //中断线程
    public void Thread.isInterrupted() //判断线程是否被中断
    public void Thread.interrupted() //判断线程是否被中断,并清除当前中断状态
    
    while (true){
        if (Thread.currentThread().isInterrupted()) {
            break;
        }
        
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            // 抛出InterruptedException时会清除掉当前线程的中断状态
            // 想要循环检测到中断状态,就需要在catch里再次设置中断状态
            Thread.currentThread().interrupt();
        }
    }
    

    3. 等待(wait)和通知(notify)

    public final native void Object.wait() throws InterruptedException;
    public final native void Object.notify();
    

    wait方法不是可以随便调用,必须在对应的同步代码块里,wait或者notify都需要先获得目标对象的监视器。线程执行wait方法前必须先获得对应Object的监视器,wait方法执行后会释放监视器,这时其他线程就可以获取这个Object的监视器了。这就实现了线程间通信。

    wait和sleep都是让线程等待。wait会释放目标对象锁,sleep不会释放任何资源

    4. 挂起(suspend)和继续执行(resume)线程
    suspend和resume是一对相关的操作,也已经废弃了,不推荐使用。相关需求可以用wait和notify来实现。

    • suspend挂起线程后不会释放任何资源,其他等待被占用资源的线程都无法执行,使用不当会导致所有相关线程都无法运行。
    • suspend挂起线程后,线程还是Runnable状态,影响问题分析。

    5. 等待线程结束(join)和谦让(yield)

    public final void Thread.join() throws InterruptedException
    public static native void Thread.yield();
    
    • join方法会阻塞当前线程,直到目标线程执行结束。本质就是让当前线程wait()在目标线程对象上,目标线程执行完成后会调用notifyAll通知所有等待线程继续执行。
    • Thread.yield()方法会让当前线程让出CPU,不过让出后还会进行CPU资源的争夺,能否再次分配到就看系统了。

    五、JDK并发包

    JDK并发包分三大部分:同步控制、线程池、并发集合

    5.1、同步控制

    常见的同步控制手段synchronized(可重入)、Object.wait()、Object.notify()。重入锁ReentrantLock,需要显式地lock,unlock,比synchronized灵活。之所以叫重入锁,是因为同一个线程可以连续多次获得同一把锁(如果没有这个特性,线程可能自己把自己锁死),需要注意的是线程获取了几次锁,就得释放几次。释放多了会报IllegalMonitorStateException,释放次数少了相当于还持有锁。

    5.1.1 ReentrantLock的优点:
    • 中断响应
    ReentrantLock lock = new ReentrantLock();
    try {
      lock.lockInterruptibly();
      ...
    } catch(InterruptedException e) {
      ...
    } finally {
      if(lock.isHeldByCurrentThread()){
        lock.unlock();
      }
    }
    

    synchronized:在等待锁时,要么获得锁,要么一直死等;
    ReentrantLock:有第三种选择,其他线程可以调用threadObj.interrupt()来中断正在等待锁的线程。

    • 锁申请等待时限
    ReentrantLock lock = new ReentrantLock();
    try {
      if(lock.tryLock(5, TimeUnit.SECONDS){
        ...
      }
    } catch(InterruptedException e) {
      ...
    } finally {
      if(lock.isHeldByCurrentThread()){
        lock.unlock();
      }
    }
    

    上面的例子,如果在5s内都没有获得锁就返回false。tryLock()也可以不带参数,这种情况,获得锁立刻返回true,没有获得就立刻返回false。

    • 公平锁
      一般情况系统是从锁的等待队列中随机挑选一个,这不能保证公平性。公平锁的一大特征是:不会产生饥饿现象。
    ReentrantLock fairLock = new ReentrantLock(true);
    try {
      fairLock.lock();
      ...
    } finally {
      fairLock.unlock();
    }
    
    5.1.2 实现可重入锁的三要素
    • 原子状态使用CAS操作来存储当前锁的状态。
    • 等待队列,没有获得锁的线程会进入等待队列,等锁释放后再唤醒一个线程,继续工作。
    • 阻塞原语park()和unpark(),用来挂起和恢复线程。
    5.1.3 重入锁的好搭档:Condition

    object.wait()/notify()是和关键字synchronized配合使用。如下例子

    private Object lockObj = new Object();
    private void threadA(){
        synchronized (lockObj){
            try {
                //线程会释放锁,进入等待状态
                lockObj.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    private void threadB(){
        synchronized (lockObj){
            lockObj.notify();
        }
    }
    

    Condition和他们功能基本一致,只不过是和ReentrantLock配合使用。如下列子:

    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private void threadA(){
        lock.lock();
        try {
            //线程会释放锁,进入等待状态
            condition.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    
    private void threadB(){
        lock.lock();
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
    
    5.1.4 信号量(Semaphore)

    不管是synchronized还是ReentrantLock,一次都只允许一个线程访问资源。信号量可以指定多个线程同时访问某一资源。

    private Semaphore semaphore = new Semaphore(5);
    private void run(){
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
          semaphore.release();  
        }
    }
    
    5.1.5 读写锁(ReadWriteLock)

    读与读之间并发,读与写、写与写之间同步。

    private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private Lock readLock = readWriteLock.readLock();
    private Lock writeLock = readWriteLock.writeLock();
    private void read(){
        readLock.lock();
        try {
            ...
        }finally {
            readLock.unlock();
        }
    }
    
    private void write(){
        writeLock.lock();
        try {
            ...
        }finally {
            writeLock.unlock();
        }
    }
    
    5.1.6 倒计时器(CountDownLatch)

    这是一个非常实用的多线程控制工具类,通常用来控制线程等待,它可以让一个线程等待倒计时结束,再开始执行。

    private CountDownLatch countDownLatch = new CountDownLatch(6);
    private void thread(){
        countDownLatch.countDown();
    }
    
    private void main(){
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ...
    }
    
    5.1.7 循环栅栏(CyclicBarrier)

    CyclicBarrierCountDownLatch很像。CountDownLatch倒计时完成后,数目无法恢复,不能再次倒计时了。CyclicBarrier倒计时完成后,又会恢复初始状态,可循环使用。

    private CyclicBarrier cyclicBarrier = new CyclicBarrier(6);
    private void thread(){
        try {
            cyclicBarrier.await();
            //6个线程都调用了await了, do sth
            ...
            //再次等待6个线程执行完上面的代码
            cyclicBarrier.await();
            //当一个线程收到InterruptedException后,
            //其他5个线程会收到BrokenBarrierException,意思就是不用等了,永远等不到第6个线程了
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
    
    5.1.8 线程阻塞工具类(LockSupport)
    class MyThread extends Thread{
        @Override
        public void run() {
            //中断不会抛出异常,而是直接返回,需要我们自己判断是否是中断返回
            LockSupport.park();
            if (Thread.interrupted()) {
                //被中断了
            }
        }
    }
    
    private void main(){
        LockSupport.unpark(myThreadObject);
    }
    

    LockSupport.park()可以在线程任意位置让线程阻塞。比较下同样是阻塞线程的Thread.suspend()Object.wait()
    Thread.suspend():不释放资源,调用后处于Runnable状态,如果对应的Thread.resume()先调用了,则导致所有相关线程无法执行。LockSupport.park()如果在对应的LockSupport.unpark()方法后才执行,那park()会立刻返回不会阻塞线程,park()阻塞线程后,线程时Waiting状态。
    Object.wait():需要先获得对象锁,等待过程中可能有InterruptedException异常。LockSupport.park()不需要对象锁,中断时不会抛出异常,而是直接返回,需要我们判断是否是中断返回。

    5.2、线程池

    • ThreadPool
    • 分而治之:Fork/Join框架

    5.3、JDK的并发容器

    • java.util:Hashtable、Vector、Collections(synchronizedList),可实现并发,但性能不佳,锁粒度太粗。
    • java.util.concurrent:ConcurrentHashMap、ConcurrentLinkedQueue(高并发环境中性能最好的队列)、CopyOnWriteArrayList、BlockingQueue、ConcurrentSkipListMap
    • java.util.concurrent.atomic:无锁线程安全的原子包装类

    5.4、锁优化及其注意事项

    5.4.1、使用锁时需要注意的点
    • 减少锁持有的时间

    减少锁的持有时间,有助于降低锁冲突的可能性,进而提升系统的并发能力。意思是不要无脑锁住整个方法,要分析出需要同步的代码,对有必要同步的步骤加锁即可。

    • 减小锁粒度

    所谓减小锁粒度,就是指缩小锁定对象的范围,从而减少锁冲突的可能性,进而提升系统的并发能力。eg、ConcurrentHashMap的实现,put操作并不是对整个Map加锁,而是先计算hash值得到该项数据要被存放到那个段中,然后对该段加锁。由于默认有16个段,因此,如果足够幸运,ConcurrentHashMap可以接受16个线程同插入数据。

    • 读写锁

    读写、写写之间互斥;读读之间并发。

    • 锁分离

    BlockingQueue的take操作和put操作使用了两个不同的锁,因为take是读的队列头,put是操作的队列尾,理论上这两个操作是不冲突的,所以锁分离优化原则,使用了两个不同的锁。

    • 锁粗化

    锁优化原则是希望锁持有时间尽量短,但是如果程序会不停的请求同一个锁,如在循环结构中请求锁,这样反而不利于性能优化,所以可以锁粗化,把锁请求放到循环外边去。

    5.4.2、虚拟机对锁优化所做的努力
    • 偏向锁
    • 轻量级锁
    • 自旋锁
    • 消除锁
    5.4.3、ThreadLocal

    ThreadLocal<Value>变量原理:Thread对象中有一个ThreadLocalMap<threadLocalObject, valueObject>集合,里面存放着在对应线程中创建的ThreadLocal<Value>变量。ThreadLocalMap<threadLocalObject, valueObject>集合的key是ThreadLocal对象,value是ThreadLocal对象中存储的我们需要的value。

    注意:ThreadLocal变量是存放在Thread对象中的,一般情况在线程执行结束后jvm会清除相关变量,但是线程池这种情况线程不结束,那么很有可能会保存大量的ThreadLocal变量导致内存泄漏。所以当我们不再使用某个ThreadLocal变量时,应该使用ThreadLocal.remove()来移除,防止内存泄漏。

    5.4.4、无锁

    对于并发控制来说,前面提到的加锁都是悲观策略,它总假设每次临界区操作都会产生冲突。悲观策略最主要的问题就是线程阻塞和唤醒带来的性能问题。还有一种乐观策略无锁并发控制,他总是认为并发总是小概率事件,所以对临界资源大胆操作就好,真的出现问题后再修正就好。无锁并发的优势:

    1. 不需要阻塞线程,不涉及线程调度,所以程序有更好的性能;
    2. 天生免疫死锁。
    • 比较交换(CAS)
      CAS算法过程:CAS(Var, Old, New)更新Var变量,Old表示预期值,New表示新值。只有当Var的值等于Old时,才会将Var的值更新为New。多个线程同时调用CAS方法更新同一个变量时,只有一个线程会更新成功,其他线程会更新失败直接返回,不会阻塞线程。在硬件>上大部分处理器支持原子化的CAS指令。
    `Unsafe`:
    sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
    // 获取`AtomicInteger`对象的字段`value`在内存中存储位置距离`AtomicInteger`对象头部的偏移量。
    long offset = U.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
    // 获得给定对象偏移量上的int值
    public native int getInt(Object o, long offset);
    

    AtomicIntegerAtomicReference、带时间戳的原子引用类AtomicStampedReference、原子数组AtomicIntegerArray、把普通变量升级成原子包装类AtomicIntegerFieldUpdater(还有long、reference)

    数组可以使用Unsafe类通过CAS操作方式保证线程安全,因为数组在内存上是连续的内存空间,可以通过内存指针偏移的方式定位到数组元素。

    细看SynchronousQueue的实现。

    5.4.5、无同步方案

    同步只是保障共享数据争用时的正确性的手段,如果一个方法本来就不涉及共享数据,那就无须线程同步。final、Immutable

    六、并行模式和算法

    并行程序设计比串行程序复杂多了,因此我们需要熟悉一些常见的并行程序设计方法。

    6.1 单例模式
    public class Singleton {
        // 单例防止被外部创建
        private Singleton(){}
    
        // volatile 关键字保证原子性,有序性
        public static volatile Singleton instance;
        
        public static Singleton getInstance(){
            // double-check,保证正确性和效率
            if(instance == null){
                synchronized(Singleton.class){
                    if(instance == null){
                        instance = new Singleton();
                    }
                }
            }
    
            return instance;
        }
    }
    

    最简单、效率最高、代码最美观

    public class Singleton {
        // 单例防止被外部创建
        private Singleton(){}
        
        private static class SingletonHolder{
            private static Singleton staticInstance = new Singleton();
        }
    
        public static Singleton getInstance(){
            return SingletonHolder.staticInstance;
        }
    }
    
    6.2 不变模式
    6.3 生产者-消费者模式
    6.4 Future模式
    6.5 并行流水线
    6.6 并行搜索
    6.7 并行排序
    6.8 并行算法:矩阵乘法
    6.9 网络NIO(准备好了再通知我)
    6.2 AIO(读完了再通知我)

    相关文章

      网友评论

          本文标题:Java高并发程序设计

          本文链接:https://www.haomeiwen.com/subject/zhzojftx.html