美文网首页
java并发基础

java并发基础

作者: wxxhfg | 来源:发表于2021-07-10 21:17 被阅读0次

    创建线程的方式

    创建线程主要有三种方法 : 继承Thread类,实现Runnable接口,实现Callable接口。

    方法一,直接使用Thread

     //创建线程
            Thread t = new Thread(){
                
                @Override
                public  void  run(){
                    System.out.println("======");
                }
            };
             t.setName("yi");
            //启动线程
            t.start();
    

    t.start()调用后才能和操作系统关联起来

    方法二,使用 Runnable 配合Thread

    Runnable runnable = () -> {
                System.out.println("---------------");
                System.out.println("12");
                System.out.println("12");
                System.out.println("12");
                System.out.println("12");
            };
    
            Thread t1 = new Thread(runnable);
            t1.start();
    

    方法三,实现Callable接口

    这种方式创建线程是可以获得子线程返回结果的,是Runnable的加强版。

    public class ThreadThree implements Callable<Long> {
        @Override
        public Long call() throws Exception {
            System.out.println("thread three");
            return 23L;
        }
    }
    

    常用方法

    方法名 static 功能说明 注意
    start() 启动一个新线程,在新的线程运行run方法中的代码 start方法只是让线程进入就绪,里面的代码不一定立刻运行(cpu时间片还没有分给它)。每个线程对象的start方法只能调用一次
    run() 新线程启动后会调用的方法 如果在否早Thread对象时传递了Runnable参数,则线程启动后会调用Runnable中的run方法,否则不执行任何线程操作。
    join() 等待线程运行结束
    join(long n) 等待线程运行结束,最多等待n毫秒
    interrupt() 打断线程 如果打断线程整在sleep,wait,join会导致被打断的线程抛出InterruptedException,并清除打断标记;如果打断正在运行的线程,则会设置打断标记;park的线程被打断,也会设置打断标记
    isInterrupted() 线程是否被打断过,不会清除打断标记 不会清除打断标记,正常线程被打断后,不会停止;需要使用打断标记来判断
    interrupted() static 判断当前线程是否被打断 会清除 打断标记

    两阶段终止

    两阶段终止

    线程状态

    (操作系统)层面

    • 初始状态:仅仅实在语言层面创建了线程对象,还未与操作系统线程相关联
    • 就绪状态:该线程已经被创建(与操作系统线程关联),可由cpu调度执行
    • 运行状态:获取cpu时间片运行中的状态
    • 阻塞状态:缺少资源的状态
    • 终止状态:线程已经执行完毕,生命周期已经结束,不会再转换为其他状态

    (java api)层面

    • new:线程刚被创建,但是还没有调用start()方法
    • runnable:当调用了start()后的状态,对应着操纵系统中的就绪状态、运行状态、和阻塞状态
    • blocked:
    • waiting:
    • timed_waiting:
    • terminated:终止状态
    线程六种状态

    状态转换

    状态转换

    情况一:NEW –> RUNNABLE

    当调用了 t.start() 方法时,由 NEW –> RUNNABLE

    情况二: RUNNABLE <–> WAITING

    • 当调用了t 线程用 synchronized(obj) 获取了对象锁后,调用 obj.wait() 方法时,t 线程从 RUNNABLE –> WAITING

    • 调用 obj.notify() , obj.notifyAll() , t.interrupt() 时,会在 WaitSet 等待队列中出现锁竞争,非公平竞争

      • 竞争锁成功,t 线程从 WAITING –> RUNNABLE

      • 竞争锁失败,t 线程从 WAITING –> BLOCKED

    情况三:RUNNABLE <–> WAITING

    当前线程调用 t.join() 方法时,当前线程从 RUNNABLE –> WAITING 注意是当前线程在 t 线程对象的监视器上等待 t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 WAITING –> RUNNABLE

    情况四: RUNNABLE <–> WAITING

    当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE –> WAITING 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING –> RUNNABLE

    情况五: RUNNABLE <–> TIMED_WAITING

    t 线程用 synchronized(obj) 获取了对象锁后 调用 obj.wait(long n) 方法时,t 线程从 RUNNABLE –> TIMED_WAITING t 线程等待时间超过了 n 毫秒,或调用 obj.notify() , obj.notifyAll() , t.interrupt() 时 竞争锁成功,t 线程从 TIMED_WAITING –> RUNNABLE 竞争锁失败,t 线程从 TIMED_WAITING –> BLOCKED

    情况六:RUNNABLE <–> TIMED_WAITING

    当前线程调用 t.join(long n) 方法时,当前线程从 RUNNABLE –> TIMED_WAITING 注意是当前线程在 t 线程对象的监视器上等待 当前线程等待时间超过了 n 毫秒,或 t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 TIMED_WAITING –> RUNNABLE

    情况七:RUNNABLE <–> TIMED_WAITING

    当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE –> TIMED_WAITING 当前线程等待时间超过了 n 毫秒,当前线程从 TIMED_WAITING –> RUNNABLE

    情况八:RUNNABLE <–> TIMED_WAITING

    当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 时,当前线 程从 RUNNABLE –> TIMED_WAITING 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,会让目标线程从 TIMED_WAITING–> RUNNABLE

    情况九:RUNNABLE <–> BLOCKED

    t 线程用 synchronized(obj) 获取了对象锁时如果竞争失败,从 RUNNABLE –> BLOCKED 持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争 成功,从 BLOCKED –> RUNNABLE ,其它失败的线程仍然 BLOCKED

    情况十: RUNNABLE <–> TERMINATED

    当前线程所有代码运行完毕,进入 TERMINATED

    共享模型之管程(Monitor)

    i++实际的jvm字节码指令

    getstatic   i //获取静态变量i的值
    iconst_1        //准备常量1
    iadd            //自增
    putstatic   i    //将修改后的值存入静态变量i
    

    方法上的synchronized

     class  test{  //锁成员方法   等价于锁当前对象
            public synchronized void test(){}
        }
    
    等价于
     class  test{
            public  void test(){
                synchronized(this){
                }
            }
    }
    
    
    
    class  Test{   //锁静态方法  等价于锁当前类对象
        public synchronized static void test(){}
    }
    
    等价于
        
    class  Test{
            public  static void test(){
                synchronized(Test.class){
                    
                }
            }
        }
    
    
    

    偏向锁的撤销

    将对象从可偏向变成不可偏向状态

    1. 调用对象的hashCode
    2. 其他线程使用对象
    3. 调用wait/notify

    sleep和wait的区别

    wait sleep
    同步 只能在同步上下文中调用wait方法,。否则或抛出iiegalMonitorStateException异常 不与需要在同步方法或者同步块中调用
    作用对象 wait方法定义在Object类中,作用于对象本身 sleep方法定义在Java.lang.Thread中,作用于当前线程
    释放锁资源
    唤醒条件 其他线程调用对象的notify()或者notifyAll()方法 超时或者调用interrupt方法体
    方法属性 wait是实例方法 sleep是静态方法

    park和unpark 与Object的wait & notify区别

    • wait,notify和notifyAll必须配合Object Monitor一起使用,而unpark不必
    • park & unpark 是以线程为单位来阻塞唤醒线程,而notify只能随机唤醒一个等待线程,notiifyAll是唤醒所有的等待进程,就不那么精确
    • park & unpark 可以先unpark,而wait & notify 不能先notify

    死锁和活锁

    死锁:竞争对方已拥有的资源 可使用顺序加锁的方式解决

    活锁:任务或者执行者没有被阻塞,由于某些条件没有被满足,导致一直重复尝试,失败,尝试,失败。 可以使用先来先服务方式解决

    ReentrantLock

    相对于synchronized 有如下特点:

    • 可中断
    • 可以设置超时时间
    • 可以设置公平锁 (先进先出) 默认为非公平锁
    • 支持多个条件变量 (等价于多个waitSet)
    • 对象的级别保护临界区

    与synchronized一样,都支持可重入

    可重入

    同一个线程吐过首次获得了这把锁,那么因为她是这把锁的拥有者,因此有权利再次获取这把锁

    如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住

    可打断

    如果某个线程处于阻塞状态,可以调用其interrup他方法让其停止阻塞,获得锁失败

    简而言之就是:处于阻塞状态的线程,被打断了就不用阻塞了,直接停止运行

    调用 lock.lockInterruptibly();方法 如果使用lock.lock()方法不可打断

    public static void main(String[] args) {
            ReentrantLock lock = new ReentrantLock();
            Thread t1 = new Thread(() -> {
                try {
                    // 加锁,可打断锁
                    lock.lockInterruptibly();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    // 被打断,返回,不再向下执行
                    return;
                }finally {
                    // 释放锁
                    lock.unlock();
                }
    
            });
    
            lock.lock();
            try {
                t1.start();
                Thread.sleep(1000);
                // 打断
                t1.interrupt();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
    

    锁超时

    使用 lock.tryLock 方法会返回获取锁是否成功。如果成功则返回 true ,反之则返回 false 。
    并且 tryLock 方法可以指定等待时间,参数为:tryLock(long timeout, TimeUnit unit), 其中 timeout 为最长等待时间,TimeUnit 为时间单位
    简而言之就是:获取锁失败了、获取超时了或者被打断了,不再阻塞,直接停止运行。
    不设置等待时间

    public static void main(String[] args) {
            ReentrantLock lock = new ReentrantLock();
            Thread t1 = new Thread(() -> {
                // 未设置等待时间,一旦获取失败,直接返回false
                if(!lock.tryLock()) {
                    System.out.println("获取失败");
                    // 获取失败,不再向下执行,返回
                    return;
                }
                System.out.println("得到了锁");
                lock.unlock();
            });
    
    
            lock.lock();
            try{
                t1.start();
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
    

    设置等待时间

    public static void main(String[] args) {
            ReentrantLock lock = new ReentrantLock();
            Thread t1 = new Thread(() -> {
                try {
                    // 判断获取锁是否成功,最多等待1秒
                    if(!lock.tryLock(1, TimeUnit.SECONDS)) {
                        System.out.println("获取失败");
                        // 获取失败,不再向下执行,直接返回
                        return;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    // 被打断,不再向下执行,直接返回
                    return;
                }
                System.out.println("得到了锁");
                // 释放锁
                lock.unlock();
            });
    
    
            lock.lock();
            try{
                t1.start();
                // 打断等待
                t1.interrupt();
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
    

    公平锁

    在线程获取锁失败,进入阻塞队列时,先进入的会在锁被释放后先获得锁。这样的获取方式就是公平的。

    // 默认是不公平锁,需要在创建时指定为公平锁
    ReentrantLock lock = new ReentrantLock(true);
    

    条件变量

    synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入waitSet 等待。
    ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比

    • synchronized 是那些不满足条件的线程都在一间休息室等消息
    • 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒

    使用要点:

    • await 前需要获得锁
    • await 执行后,会释放锁,进入 conditionObject 等待
    • await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
    • 竞争 lock 锁成功后,从 await 后继续执
    • 使用await后要使用signal来唤醒

    同步模式之顺序控制

    wait/notify实现

    public class Code_32_Test {
    
        public static void main(String[] args) {
            WaitAndNotify waitAndNotify = new WaitAndNotify(1, 5);
    
            new Thread(()->{
                waitAndNotify.run("a", 1, 2);
            }).start();
            new Thread(()->{
                waitAndNotify.run("b", 2, 3);
            }).start();
            new Thread(()->{
                waitAndNotify.run("c", 3, 1);
            }).start();
        }
    }
    
    class WaitAndNotify {
        public void run(String str, int flag, int nextFlag) {
            for(int i = 0; i < loopNumber; i++) {
                synchronized(this) {
                    while (flag != this.flag) {
                        try {
                            this.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.print(str);
                    // 设置下一个运行的线程标记
                    this.flag = nextFlag;
                    // 唤醒所有线程
                    this.notifyAll();
                }
            }
        }
    
        private int flag;
        private int loopNumber;
    
        public WaitAndNotify(int flag, int loopNumber) {
            this.flag = flag;
            this.loopNumber = loopNumber;
        }
    }
    
    

    park和unpark实现

    public class Code_33_Test {
    
        public static Thread t1, t2, t3;
        public static void main(String[] args) {
            ParkAndUnPark obj = new ParkAndUnPark(5);
            t1 = new Thread(() -> {
                obj.run("a", t2);
            });
    
            t2 = new Thread(() -> {
                obj.run("b", t3);
            });
    
            t3 = new Thread(() -> {
                obj.run("c", t1);
            });
            t1.start();
            t2.start();
            t3.start();
    
            LockSupport.unpark(t1);
        }
    }
    
    class ParkAndUnPark {
        public void run(String str, Thread nextThread) {
            for(int i = 0; i < loopNumber; i++) {
                LockSupport.park();
                System.out.print(str);
                LockSupport.unpark(nextThread);
            }
        }
    
        private int loopNumber;
    
        public ParkAndUnPark(int loopNumber) {
            this.loopNumber = loopNumber;
        }
    }
    
    

    await/signal实现

    public class Code_34_Test {
    
        public static void main(String[] args) {
            AwaitAndSignal lock = new AwaitAndSignal(5);
            Condition a = lock.newCondition();
            Condition b = lock.newCondition();
            Condition c = lock.newCondition();
            new Thread(() -> {
                lock.run("a", a, b);
            }).start();
    
            new Thread(() -> {
                lock.run("b", b, c);
            }).start();
    
            new Thread(() -> {
                lock.run("c", c, a);
            }).start();
    
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            lock.lock();
            try {
                a.signal();
            }finally {
                lock.unlock();
            }
        }
    }
    
    class AwaitAndSignal extends ReentrantLock {
        public void run(String str, Condition current, Condition nextCondition) {
            for(int i = 0; i < loopNumber; i++) {
                lock();
                try {
                    current.await();
                    System.out.print(str);
                    nextCondition.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    unlock();
                }
            }
        }
    
        private int loopNumber;
    
        public AwaitAndSignal(int loopNumber) {
            this.loopNumber = loopNumber;
        }
    }
    

    共享模型之内存

    java内存模型(JMM)

    JMM 即 Java Memory Model,它定义了主存(共享内存)、工作内存(线程私有)抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、 CPU 指令优化等。
    JMM 体现在以下几个方面

    • 原子性 - 保证指令不会受到线程上下文切换的影响
    • 可见性 - 保证指令不会受 cpu 缓存的影响
    • 有序性 - 保证指令不会受 cpu 指令并行优化的影响

    volatile原理

    volatile的底层实现原理是内存屏障,Memory Barrier

    • 对volatile变量的写指令后会加入写屏障
    • 对volatile变量的读指令前会加入读屏障

    保证可见性

    • 写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
    public void actor2(I_Result r) {
         num = 2;
         ready = true; // ready 是被 volatile 修饰的,赋值带写屏障
         // 写屏障
    }
    
    • 而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据
    public void actor1(I_Result r) {
     // 读屏障
     // ready是被 volatile 修饰的,读取值带读屏障
     if(ready) {
        r.r1 = num + num;
     } else {
        r.r1 = 1;
     }
    }
    

    分析如图:


    屏障

    保证有序性

    • 写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
    public void actor2(I_Result r) {
     num = 2;
     ready = true; // ready 是被 volatile 修饰的,赋值带写屏障
     // 写屏障
    }
    
    • 读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
    public void actor1(I_Result r) {
     // 读屏障
     // ready 是被 volatile 修饰的,读取值带读屏障
     if(ready) {
     r.r1 = num + num;
     } else {
     r.r1 = 1;
     }
    }
    

    注意

    volatile只能保证有序性和可见性,不能解决指令交错

    写屏障仅仅是保证之后的读能够读到最新的结果,但不能保证其它线程的读跑到它前面去。 而有序性的保证也只是保证了本线程内相关代码不被重排序

    image-20210425200550672.png

    synchronized可以保证原子性、有序性、可见性是对其可以完全管理的资源来说的,如使用double-checked时的INSTANCE未被完全管理,则不能完全实现有序性、可见性

    // 最开始的单例模式是这样的
        public final class Singleton {
            private Singleton() { }
            private static Singleton INSTANCE = null;
            public static Singleton getInstance() {
            // 首次访问会同步,而之后的使用不用进入synchronized
            synchronized(Singleton.class) {
                if (INSTANCE == null) { // t1
                    INSTANCE = new Singleton();
                }
            }
                return INSTANCE;
            }
        }
    
    // 但是上面的代码块的效率是有问题的,因为即使已经产生了单实例之后,之后调用了getInstance()方法之后还是会加锁,这会严重影响性能!因此就有了模式如下double-checked lockin:
        public final class Singleton {
            private Singleton() { }
            private static Singleton INSTANCE = null;
            public static Singleton getInstance() {
                if(INSTANCE == null) { // t2
                    // 首次访问会同步,而之后的使用没有 synchronized
                    synchronized(Singleton.class) {
                        if (INSTANCE == null) { // t1
                            INSTANCE = new Singleton();
                        }
                    }
                }
                return INSTANCE;
            }
        }
    //但是上面的if(INSTANCE == null)判断代码没有在同步代码块synchronized中,不能享有synchronized保证的原子性,可见性。所以
    

    以上的实现特点是:

    • 懒惰实例化
    • 首次使用 getInstance() 才使用 synchronized 加锁,后续使用时无需加锁
    • 有隐含的,但很关键的一点:第一个 if 使用了 INSTANCE 变量,是在同步块之外

    但在多线程环境下,上面的代码是有问题的,getInstance 方法对应的字节码为:

    0: getstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
    3: ifnonnull 37
    // ldc是获得类对象
    6: ldc #3 // class cn/itcast/n5/Singleton
    // 复制操作数栈栈顶的值放入栈顶, 将类对象的引用地址复制了一份
    8: dup
    // 操作数栈栈顶的值弹出,即将对象的引用地址存到局部变量表中
    // 将类对象的引用地址存储了一份,是为了将来解锁用
    9: astore_0
    10: monitorenter
    11: getstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
    14: ifnonnull 27
    // 新建一个实例
    17: new #3 // class cn/itcast/n5/Singleton
    // 复制了一个实例的引用
    20: dup
    // 通过这个复制的引用调用它的构造方法
    21: invokespecial #4 // Method "<init>":()V
    // 最开始的这个引用用来进行赋值操作
    24: putstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
    27: aload_0
    28: monitorexit
    29: goto 37
    32: astore_1
    33: aload_0
    34: monitorexit
    35: aload_1
    36: athrow
    37: getstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
    40: areturn
    

    其中:

    • 17 表示创建对象,将对象引用入栈 // new Singleton
    • 20 表示复制一份对象引用 // 复制了引用地址
    • 21 表示利用一个对象引用,调用构造方法 // 根据复制的引用地址调用构造方法
    • 24 表示利用一个对象引用,赋值给 static INSTANCE

    可能出现的问题


    线程t1 还未对对象初始化完成,但时赋值操作已经完成,此时线程t2就会使用未初始化完全的对象。

    简单来说,就是new 对象这个操作不是原子操作。会分成创建引用且复制一份引用,其中一份引用赋值给变量,另外一份执行<init>函数。所以,如果在初始化没有完成前,就赋值了则会出现并发问题。

    double-checked locking 解决

    加volatile就行了。

    public final class Singleton {
            private Singleton() { }
            private static volatile Singleton INSTANCE = null;
            public static Singleton getInstance() {
                // 实例没创建,才会进入内部的 synchronized代码块
                if (INSTANCE == null) {
                    synchronized (Singleton.class) { // t2
                        // 也许有其它线程已经创建实例,所以再判断一次
                        if (INSTANCE == null) { // t1
                            INSTANCE = new Singleton();
                        }
                    }
                }
                return INSTANCE;
            }
        }
    

    共享模型之无锁

    管程即 monitor 是阻塞式的悲观锁实现并发控制,这章我们将通过非阻塞式的乐观锁的来实现并发控制

    CAS方式

    class AccountSafe implements Account{
    
        AtomicInteger atomicInteger ;
        
        public AccountSafe(Integer balance){
            this.atomicInteger =  new AtomicInteger(balance);
        }
        
        @Override
        public Integer getBalance() {
            return atomicInteger.get();
        }
    
        @Override
        public void withdraw(Integer amount) {
            // 核心代码
            while (true){
                int pre = getBalance();
                int next = pre - amount;
                if (atomicInteger.compareAndSet(pre,next)){
                    break;
                }
            }
        }
    }
    

    原子整数

    java.util.concurrent.atomic并发包提供了一些并发工具类,这里把它分成五类:
    使用原子的方式更新基本类型

    • AtomicInteger:整型原子类
    • AtomicLong:长整型原子类
    • AtomicBoolean :布尔型原子类

    以 AtomicInteger 为例

    public static void main(String[] args) {
            AtomicInteger i = new AtomicInteger(0);
        
            // 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
            System.out.println(i.getAndIncrement());
        
            // 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
            System.out.println(i.incrementAndGet());
        
            // 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
            System.out.println(i.decrementAndGet());
        
            // 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
            System.out.println(i.getAndDecrement());
        
            // 获取并加值(i = 0, 结果 i = 5, 返回 0)
            System.out.println(i.getAndAdd(5));
        
            // 加值并获取(i = 5, 结果 i = 0, 返回 0)
            System.out.println(i.addAndGet(-5));
        
            // 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
            // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
            System.out.println(i.getAndUpdate(p -> p - 2));
        
            // 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
            // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
            System.out.println(i.updateAndGet(p -> p + 2));
        
            // 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
            // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
            // getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
            // getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
            System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
        
            // 计算并获取(i = 10, p 为 i 的当前值, x 为参数1值, 结果 i = 0, 返回 0)
            // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
            System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
        }
    
    

    原子引用

    为什么需要原子引用类型?保证引用类型的共享变量是线程安全的(确保这个原子引用没有引用过别人)。

    基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用引用类型原子类。

    • AtomicReference:引用类型原子类
    • AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
    • AtomicMarkableReference :原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起。只关心是否被更改过

    原子数组

    改变的只是对象本身里面的值,而不需要创建一个新的数组对象

    使用原子的方式更新数组里的某个元素

    • AtomicIntegerArray:整形数组原子类
    • AtomicLongArray:长整形数组原子类
    • AtomicReferenceArray :引用类型数组原子类
    public class Code_10_AtomicArrayTest {
    
        public static void main(String[] args) throws InterruptedException {
            /**
            参数1,提供数组、可以是线程不安全数组或线程安全数组
            参数2,获取数组长度的方法
            参数3,自增方法,回传array,index
            参数4,打印数组的方法
            
            
            函数式编程:
            //supplier 提供者  无中生有 ()->结果
            //function  函数      一个参数一个结果 (参数)->结果, BiFunction(参数1,参数2)-> 结果
            //consumer  消费者     一个参数没有结果  ()->void,   BiComsumer(参数1,参数2)->
            
             * 结果如下:
             * [9934, 9938, 9940, 9931, 9935, 9933, 9944, 9942, 9939, 9940]
             * [10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
             */
            demo(
                    () -> new int[10],
                    (array) -> array.length,
                    (array, index) -> array[index]++,
                    (array) -> System.out.println(Arrays.toString(array))
            );
            TimeUnit.SECONDS.sleep(1);
            demo(
                    () -> new AtomicIntegerArray(10),
                    (array) -> array.length(),
                    (array, index) -> array.getAndIncrement(index),
                    (array) -> System.out.println(array)
            );
        }
    
        private static <T> void demo(
                Supplier<T> arraySupplier,
                Function<T, Integer> lengthFun,
                BiConsumer<T, Integer> putConsumer,
                Consumer<T> printConsumer) {
            ArrayList<Thread> ts = new ArrayList<>(); // 创建集合
            T array = arraySupplier.get(); // 获取数组
            int length = lengthFun.apply(array); // 获取数组的长度
            for(int i = 0; i < length; i++) {
                ts.add(new Thread(() -> {
                    for (int j = 0; j < 10000; j++) {
                        putConsumer.accept(array, j % length);
                    }
                }));
            }
            ts.forEach(Thread::start);
            ts.forEach((thread) -> {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            printConsumer.accept(array);
        }
    
    }
    

    字段更新器

    原子累加器

    AtomicLong Vs LongAdder

        public static void main(String[] args) {
            for(int i = 0; i < 5; i++) {
                demo(() -> new AtomicLong(0), (ref) -> ref.getAndIncrement());
            }
            for(int i = 0; i < 5; i++) {
                demo(() -> new LongAdder(), (ref) -> ref.increment());
            }
        }
    
        private static <T> void demo(Supplier<T> supplier, Consumer<T> consumer) {
            ArrayList<Thread> list = new ArrayList<>();
    
            T adder = supplier.get();
            // 4 个线程,每人累加 50 万
            for (int i = 0; i < 4; i++) {
                list.add(new Thread(() -> {
                    for (int j = 0; j < 500000; j++) {
                        consumer.accept(adder);
                    }
                }));
            }
            long start = System.nanoTime();
            list.forEach(t -> t.start());
            list.forEach(t -> {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            long end = System.nanoTime();
            System.out.println(adder + " cost:" + (end - start)/1000_000);
        }
    
    
    

    执行代码后,发现使用 LongAdder 比 AtomicLong 快2,3倍,使用 LongAdder 性能提升的原因很简单,就是在有竞争时,设置多个累加单元(但不会超过cpu的核心数),Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。

    LongAdder 原理

    和ConcurrentHashMap中的countcell数组使用了相同的思想

    LongAdder 类有几个关键域
    public class LongAdder extends Striped64 implements Serializable {}
    下面的变量属于 Striped64 被 LongAdder 继承。

    // 累加单元数组, 懒惰初始化
    transient volatile Cell[] cells;
    // 基础值, 如果没有竞争, 则用 cas 累加这个域
    transient volatile long base;
    // 在 cells 创建或扩容时, 置为 1, 表示加锁
    transient volatile int cellsBusy; 
    

    使用cas实现一个自旋锁

     public void lock() {
            while (true) {
                if(state.compareAndSet(0, 1)) {
                    break;
                }
            }
        }
    
        public void unlock() {
            log.debug("unlock...");
            state.set(0);
        }
    

    原理之伪内存

    // 防止缓存行伪共享
    @sun.misc.Contended   //防止一个缓存行容纳多个cell对象
    static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        // 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
        final boolean cas(long prev, long next) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
        }
        // 省略不重要代码
    }
    

    每个cpu拥有着多级缓存(一般为3级缓存),缓存以缓存行为单位i,每个缓存行对应着一块内存,一般是64字节

    cpu要保证数据的一致性,如果某个cpu核心更改了数据,其它cpu核心对应的整个缓存行必须失效

    Unsafe

    Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得。LockSupport 的 park 方法,cas 相关的方法底层都是通过Unsafe类来实现的。

    public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
            // Unsafe 使用了单例模式,unsafe 对象是类中的一个私有的变量 
            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafe.setAccessible(true);
            Unsafe unsafe = (Unsafe)theUnsafe.get(null);
            
        }
    

    final原理

    和volatile一样,使用了读写屏障

    线程池

    具体请见线程池简易实现和线程池源码

    image-20210708194709608.png

    自定义线程池

    package Thread;
    
    
    import java.util.ArrayDeque;
    import java.util.Deque;
    import java.util.HashSet;
    import java.util.Set;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * 自定义线程池
     */
    
    public class demo06 {
    
        public static void main(String[] args) {
            ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1,
                    (queue, task) -> {
                        // 1. 阻塞等待。
    //                    queue.put(task);
                        // 2. 带超时的等待
    //                    queue.offer(task, 500, TimeUnit.MILLISECONDS);
                        // 3. 调用者放弃
    //                    log.info("放弃 {}", task);
                        // 4. 调用者抛出异常
    //                    throw new RuntimeException("任务执行失败" + task);
                        // 5. 调用者自己执行任务
                        task.run();
                    });
            for(int i = 0; i < 4; i++) {
                int j = i;
                threadPool.executor(() ->{
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(j);
                });
            }
        }
    
    }
    
    @FunctionalInterface // 拒绝策略
    interface RejectPolicy<T> {
        void reject(BlockingQueue<T> queue, T task);
    }
    
    // 实现线程池
    class ThreadPool {
        // 线程集合
        private Set<Worker> works = new HashSet<Worker>();
        // 任务队列
        private BlockingQueue<Runnable> taskQueue;
        // 线程池的核心数
        private int coreSize;
        // 获取任务的超时时间
        private long timeout;
        private TimeUnit unit;
        // 使用策略模式。
        private RejectPolicy<Runnable> rejectPolicy;
    
        public ThreadPool(int coreSize, long timeout, TimeUnit unit, int queueCapacity,
                          RejectPolicy<Runnable> rejectPolicy) {
            this.coreSize = coreSize;
            this.timeout = timeout;
            this.unit = unit;
            taskQueue = new BlockingQueue<Runnable>(queueCapacity);
            this.rejectPolicy = rejectPolicy;
        }
    
        // 执行任务
        public void executor(Runnable task) {
            // 如果线程池满了. 就将任务加入到任务队列, 否则执行任务
            synchronized (works) {
                if(works.size() < coreSize) {
                    Worker worker = new Worker(task);
    //                log.info("新增 worker {} ,任务 {}", worker, task);
                    works.add(worker);
                    worker.start();
                } else {
    //                taskQueue.put(task);
                    // 1)死等
                    // 2)带超时等待
                    // 3)让调用者放弃任务执行
                    // 4)让调用者抛出异常
                    // 5)让调用者自己执行任务
    
                    taskQueue.tryPut(rejectPolicy, task);
                }
            }
        }
    
        class Worker extends Thread {
    
            private Runnable task;
    
            public Worker(Runnable task) {
                this.task = task;
            }
    
            // 执行任务
            // 1)当 task 不为空,执行任务
            // 2)当 task 执行完毕,再接着从任务队列获取任务并执行
            @Override
            public void run() {
    //            while (task != null || (task = taskQueue.take()) != null) {
                while (task != null || (task = taskQueue.poll(timeout, unit)) != null) {
                    try {
    //                    log.info("正在执行 {}", task);
                        task.run();
                    }catch (Exception e) {
    
                    } finally {
                        task = null;
                    }
                }
                synchronized (works) {
    //                log.info("worker 被移除 {}", this);
                    works.remove(this);
                }
            }
        }
    }
    
    // 实现阻塞队列
    class BlockingQueue<T> {
    
        // 阻塞队列的容量
        private int capacity;
        // 双端链表, 从头取, 从尾加
        private Deque<T> queue;
        // 定义锁
        private ReentrantLock lock;
        // 当阻塞队列满了时候, 去 fullWaitSet 休息, 生产者条件变量
        private Condition fullWaitSet;
        // 当阻塞队列空了时候,去 emptyWaitSet 休息, 消费者小件变量
        private Condition emptyWaitSet;
    
        public BlockingQueue(int capacity) {
            queue = new ArrayDeque<>(capacity);
            lock = new ReentrantLock();
            fullWaitSet = lock.newCondition();
            emptyWaitSet = lock.newCondition();
            this.capacity = capacity;
        }
    
        // 带有超时时间的获取
        public T poll(long timeout, TimeUnit unit) {
            lock.lock();
            try {
                // 同一时间单位
                long nanos = unit.toNanos(timeout);
                while (queue.isEmpty()) {
                    try {
                        if(nanos <= 0) {
                            return null;
                        }
                        // 防止虚假唤醒, 返回的是所剩时间
                        nanos = emptyWaitSet.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                T t = queue.removeFirst();
                fullWaitSet.signal();
                return t;
            }finally {
                lock.unlock();
            }
        }
    
        // 获取
        public T take() {
            lock.lock();
            try {
                while (queue.isEmpty()) {
                    try {
                        emptyWaitSet.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                T t = queue.removeFirst();
                fullWaitSet.signal();
                return t;
            }finally {
                lock.unlock();
            }
        }
    
        // 添加
        public void put(T task) {
            lock.lock();
            try {
                while (queue.size() == capacity) {
                    try {
    //                    log.info("等待加入任务队列 {}", task);
                        fullWaitSet.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    //            log.info("加入任务队列 {}", task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }finally {
                lock.unlock();
            }
        }
        // 带有超时时间的添加
        public boolean offer(T task, long timeout, TimeUnit unit) {
            lock.lock();
            try {
                long nanos = unit.toNanos(timeout);
                while (queue.size() == capacity) {
                    try {
                        if(nanos <= 0) {
                            return false;
                        }
    //                    log.info("等待加入任务队列 {}", task);
                        nanos = fullWaitSet.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    //            log.info("加入任务队列 {}", task);
                queue.addLast(task);
                emptyWaitSet.signal();
                return true;
            }finally {
                lock.unlock();
            }
        }
    
        public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
            lock.lock();
            try {
                // 判断判断是否满
                if(queue.size() == capacity) {
                    rejectPolicy.reject(this, task);
                } else { // 有空闲
    //                log.info("加入任务队列 {}", task);
                    queue.addLast(task);
                    emptyWaitSet.signal();
                }
    
            }finally {
                lock.unlock();
            }
        }
    
        public int getSize() {
            lock.lock();
            try {
                return queue.size();
            } finally {
                lock.unlock();
            }
        }
    }
    

    AQS

    具体请见AQS源码浅析

    全称是AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架。

    特点:

    • 用state属性来表示资源的状态(分为独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和设防所

      • getState-获取state状态

      • setState=设置state状态

      • compareAndSetState - 乐观锁设置state状态

      • 独占模式是只有以一个线程能够访问资源,而共享模式可以允许多个线程访问资源

    • 提供了基于FIFO的等待队列,类似于Monitor的EntryList

    • 条件变量来实现等待,唤醒机制,支持多个条件变量,类似于Monitor的WaitSet

    ReentrantLock

    image-20210517145119092.png

    非公平锁实现原理

    从构造器开始看,默认为非公平锁实现

    public ReentrantLock() {
     sync = new NonfairSync();
     }
    

    lock()

    总结:

    掌握知识点:

    Synchronized原理 LockSupport原理 ReenTrantLock原理

    • 分析多线程访问共享资源时,哪些代码片段属于临界区
    • 使用 synchronized 互斥解决临界区的线程安全问题
      • 掌握 synchronized 锁对象语法
      • 掌握 synchronzied 加载成员方法和静态方法语法
      • 掌握 wait/notify 同步方法
    • 使用 lock 互斥解决临界区的线程安全问题 掌握 lock 的使用细节:可打断、锁超时、公平锁、条件变量
    • 学会分析变量的线程安全性、掌握常见线程安全类的使用
    • 了解线程活跃性问题:死锁、活锁、饥饿
    • 应用方面
      • 互斥:使用 synchronized 或 Lock 达到共享资源互斥效果,实现原子性效果,保证线程安全。
      • 同步:使用 wait/notify 或 Lock 的条件变量来达到线程间通信效果。
    • 原理方面
      • monitor、synchronized 、wait/notify 原理
      • synchronized 进阶原理
      • park & unpark 原理
    • 模式方面
      • 同步模式之保护性暂停
      • 异步模式之生产者消费者
      • 同步模式之顺序控制

    JMM CAS原理 Volatile原理

    • 可见性 - 由 JVM 缓存优化引起
    • 有序性 - 由 JVM 指令重排序优化引起
    • happens-before 规则
    • 原理方面
      • volatile
    • 模式方面
      • 两阶段终止模式的 volatile 改进
      • 同步模式之 balking
    1. CAS 与 volatile
    2. juc 包下 API
      1. 原子整数
      2. 原子引用
      3. 原子数组
      4. 字段更新器
      5. 原子累加器
    3. Unsafe
    4. 原理方面
      1. LongAdder 源码
      2. 伪共享

    相关文章

      网友评论

          本文标题:java并发基础

          本文链接:https://www.haomeiwen.com/subject/arqhpltx.html