美文网首页
JAVA并发——线程同步器AQS

JAVA并发——线程同步器AQS

作者: 海晨忆 | 来源:发表于2021-07-19 22:33 被阅读0次

    个人博客:haichenyi.com。感谢关注

    前言

      每一项技术的提出都是为了解决某一个问题,带着问题来理解技术,使得印象你对这个技术的理解印象更加深刻。

    并发

      举个栗子:现在有一个需求,网络请求分两批(A,B两个批次),A批次并行请求,B批次串行请求按顺序一个一个请求,有一个总超时时间,B批次每一次请求都有一个超时时间,A批次并发请求先请求,在规定时间内没有返回,再开始请求B批次,谁先返回用谁的。

      这个是我们项目里面简化过后的一个逻辑,实际逻辑,比这个还要复杂。怎么实现这个功能呢?

      带着这个问题来进入我们的正题,什么是并发?

    1. 并发是一种现象:同时运行多个程序或者多个任务需要被处理的现象。
    2. 这些任务可能是并行的,也可能是串行的,和CPU的核心数无关,是操作系统进程调度和CPU上下文切换达到的结果。
    3. 解决并发的思路就是把分解,把一个大任务分解成多个小任务来执行

      像我们上面所说的需求就是分解成一个一个的网络请求,一部分并行请求,一部分串行请求。并行请求简单来说,对应我们应用里面就是多线程,多线程同时执行;串行请求对应我们应用里面就是单线程,一个线程执行完了,另一个线程才开始。这里不考虑多进程的问题。

    并发为什么会造成线程不安全的问题

      我们先来聊聊cpu是怎么执行指令:

    1. 首先,cpu执行指令的过程中,不可避免会执行读写操作,而这个操作都是从主存(也就是物理内存)中去读写
    2. 但是,cpu执行指令速度很快,程序运行过程中的临时变量都是放在主存当中的,如果全部都是从主存中去读写,读写很耗时,这样就浪费了cpu的性能
    3. 最后,为了解决这个问题,就出现了高速缓存的概念。我们先把变量读取到告诉缓存中,然后,再高速缓存中操作完之后,再刷新到主存当中。

      java的内存模型规定,所有的变量都在主存当中,类似于物理内存,每个线程都有自己的工作空间,也就是对应上面的高速缓存,每一个线程都有一个自己的高速缓存。

      线程对变量的操作必须在自己的工作空间内,不能直接操作主存,而且,一个线程也不能访问另一个线程的工作空间。

      那么,我们如果多个线程同时对一个变量做加1操作,如下面的add1方法。我们thread1和thread2把a的值同时复制到自己的工作空间中时,都是0,然后同时进行加1操作,同时刷新到主存当中,那最后,我们获取到的最终的值就是1,而不是我们想要的2.这就是并发造成的线程不安全的问题

    简单的同步器

      我们项目里面遇到的并发问题,基本上就是多线程访问同一变量的问题,比方说,简单的举个栗子,两个线程对同一个int值做加1操作,然后打印出来。

        public int a = 0;
        @org.junit.Test
        public void add1() {
            Thread thread1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    a++;
                    System.out.println("thread1:a=" + a);
                }
            });
    
            Thread thread2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    a++;
                    System.out.println("thread2:a=" + a);
                }
            });
            thread1.start();
            thread2.start();
        }
    

      因为这里直接用的基本类型,极端情况下,线程1,线程2同时执行,里面的a++操作也是同时执行,那这里两个打印都是1,虽然,这里我没有复现出来。但是,这种情况肯定是存在的。那么,怎么避免这种情况呢?

      我们可以写一个简单的线程同步器,就是加锁操作,如下:

        private Object lock = new Object();
        public int a = 0;
        @org.junit.Test
        public void add1(){
            Thread thread1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    //先获取到lock 对象的锁
                    synchronized (lock){
                        try {
                         //在lock对象上执行wait()方法,让其进入休眠,等待有人唤醒自己
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        a++;
                        System.out.println("thread1:a=" + a);
                    }
                }
            });
    
            Thread thread2 = new Thread(new Runnable() {
                @Override
                public void run() {
                //获取lock的锁
                    synchronized (lock){
                        a++;
                        System.out.println("thread2:a=" + a);
                        //唤醒正在lock对象上等待的线程
                        lock.notify();
                    }
                }
            });
            thread1.start();
            thread2.start();
        }
    

      这里,我们用到的object类的wait和notify方法。等待和唤醒。当线程1执行到lock.wait();方法时,线程1会进入等待状态。当线程2执行lock.notify();时,会唤醒线程1,执行线程1的后续a++,打印操作。

      那么,这里会有一个问题,如果线程2先执行,线程1后执行,那么线程1将永远的等待下去,这也是这样写的一个弊端。这还只是2个线程,实际项目中往往比这个复杂多了。为了解决这弊端,就引出了我们的线程同步器AQS(AbstractQueuedSynchronizer)

      并且,这个同步器,java.util包下面都已经给好了实现类,比方说:Semaphore,ReentrantLock,CountDownLatch等等都是,并且,我们用到的java线程池ThreadPoolExecutor中的Worker的实现也是。如下图:

    worker图片.png

    线程同步器AQS(AbstractQueuedSynchronizer)

      线程同步器就是为了解决并发引起的线程不安全的问题。线程安全的三大特性:原子性,可见性,有序性。

    原理

      简单点来说就是,它维护一个状态state,还有一个CLH队列。

      CLH时一个双端队列,队列中每一个节点都放着正在等待获取资源的线程。当线程现在通过CAS原子算法比较预期值的方式去获取资源,也就是判断这里的state状态,是不是有等待获取资源的线程可以使用,如果时有,那就直接使用,如果没有,那就会将这个线程封装成一个节点Node,插入到CLH队列的尾部等待被唤醒。其他线程执行完之后,调用release释放一部分资源,那么,正在等待的队列就会被唤醒,去执行自己的任务。大致是这个意思,当然,AQS还有中断等其他的操作

    简单的AQS同步器

      还是类似于上面那个例子,不过,都是访问同一个变量,业务背景换一下,换成一个库存秒杀,通过访问服务器,一共10件物品,我现在有20个线程去同时请求,哪些能抢到,哪些不能抢到?

    public int count = 10;
    
        @org.junit.Test
        public void test3() {
            for (int i = 1; i <= 20; i++) {
                final int finalI = i;
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        buy(finalI);
                    }
                }).start();
            }
        }
    
    
        public void buy(int i) {
            if (count != 0) {
                count--;
                System.out.println("第" + i + "个用户抢到了," + "库存还剩:" + count + "件");
            } else {
                System.out.println("第" + i + "个用户抢到了," + "已经被抢光了");
            }
        }
    

      就像上面这样模拟一个简单的秒杀场景,库存10件,20个用户抢,实际情况肯定不止。我们看一下这样写的打印,如下图:

    秒杀图1.png

      我们看到这个最终虽然有10个用户的确抢到了,但是,我们看一下打印,打印是从8个开始的,不是从9开始的。我们理想的应该是如下这个图:

    秒杀图2.png

      这个图是我把buy方法加上了synchronized关键字,我给它锁住了。当执行buy方法的时候,另一个线程如果也进来执行buy,它会等待,等待前一个buy方法执行完,它才开始执行。

      那么,直接用这个关键字就好了呀,还要啥自行车?天真,存在即合理。synchronized关键字锁比较重,不适合这种秒杀场景。

      言归正传,上面两个从8开始,结合我们前面说的并发的问题,是不是就可以联想到,如果,多个线程同时,同一时刻访问,然后,数据库的库存同时减1,是不是就会出现一件商品,卖给多个人的情况?

      有同学就会想,那么这么巧,同时访问,同一时刻,那你想想天猫双十一,那些节假日的秒杀场景,会出现什么问题?

      这个时候线程同步器就出现了,我们不能对用户做限制,我们不能说张三你必须在某一个时间内访问,李四在某一个时间内访问,所以,我们能控制的只有服务器,也就是这里的buy方法。

      也就是说,我们这里用的是悲观锁的方式,进入buy方法就立刻加锁,运行完buy方法就解锁。后面应用里面再聊这个悲观锁,乐观锁之类的。我们就把buy方法改成如下这样了:

        MyOwnLock ownLock = new MyOwnLock();
    
        public void buy(int i) {
            //加锁
            ownLock.lock();
            if (count != 0) {
                count--;
                System.out.println("第" + i + "个用户抢到了," + "库存还剩:" + count + "件");
            } else {
                System.out.println("第" + i + "个用户抢到了," + "已经被抢光了");
            }
            //解锁
            ownLock.unLock();
        }
        
    /**
     * @ClassName: MyOwnLock
     * @Description:
     * @Author: 海晨忆
     * @Date: 2021/7/15 11:07
     */
    public class MyOwnLock {
        public void lock() {
    
        }
    
        public void unLock() {
    
        }
    }
    

      如上代码,现在,我们要做的就是完善MyOwnLock类的加锁和解锁方法。

      我们想想这个流程,多个用户同时下单,实际上对于代码来讲,就是多个线程同时请求服务器,调用这里的buy方法,去减少库存,我们需要做的就是保证这里减少库存不能出问题。

      怎么保证这个减少库存不能出问题呢?就是我们前面说的并发的问题,也就是这里的线程安全的问题。线程安全的三大特性:原子性,可见性,有序性。我们保证这三个特性就好了。

      第一个线程进来,我们就标记一下,已经有线程进来在执行了,我们就改变这个标记,后面的线程感知到这个改变之后,就必须等待。那这个标记,怎么让其他线程感知到呢?

      volatile关键字的两大特性:可见性,有序性

      可见性,就是可以让其他线程感知到。那么,就解决了这个问题,我们用计数器的方式来做这个标记。我们是悲观锁的方式,始终只能有一个线程访问,必须等这个线程访问完了,其他线程才能访问。

      对应成代码就是,这个计数器变量初始化是0,加锁成功之后,就加1,后面的线程进来的时候,判断这个计数器是不是0,如果不是0,就表示有线程正在访问,不能进行加锁操作;如果是0,就表示没有,可以进行加锁操作。那么我们就开始写代码:

    package com.example.myapplication;
    
    /**
     * @ClassName: MyOwnLock
     * @Description:
     * @Author: 海晨忆
     * @Date: 2021/7/15 11:07
     */
    public class MyOwnLock {
        private volatile int state;
        private Thread currentHolder;
    
        public void lock() {
            Thread currentThread = Thread.currentThread();
            int state = getState();
            if (state == 0) {
                setCurrentHolder(currentThread);
            }
        }
    
        public void unLock() {
    
        }
    
        public int getState() {
            return state;
        }
    
        public void setState(int state) {
            this.state = state;
        }
    
        public Thread getCurrentHolder() {
            return currentHolder;
        }
    
        public void setCurrentHolder(Thread currentHolder) {
            this.currentHolder = currentHolder;
        }
    }
    
    

      直接用if(state==0)来判断就可以了吗?volatile关键字只能保证可见性,有序性,并不能保证原子性,所以,volatile并不是真正的线程安全,只是大多数情况下还是比较有用的,而,我们这里要保证线程安全,就需要保证原子性,原子性怎么保证呢?

      这里就出现了关键的CAS算法了,Compare And Swap比较互换。这个算法java里面是怎么实现的呢?我们java里面有一个Unsafe类,他的里面全是native方法,提供的都是硬件级别的原子操作。我们用到的就是这个类里面的几个方法,CAS的操作也是这些方法实现的compareAndSwapXXX。具体的可以在网上搜一下这个类,然后仔细的看一下,这里,我简单的介绍一个:

      /***
       * Compares the value of the object field at the specified offset
       * in the supplied object with the given expected value, and updates
       * it if they match.  The operation of this method should be atomic,
       * thus providing an uninterruptible way of updating an object field.
       * 在obj的offset位置比较object field和期望的值,如果相同则更新。这个方法
       * 的操作应该是原子的,因此提供了一种不可中断的方式更新object field。
       * 
       * @param obj the object containing the field to modify.
       *    包含要修改field的对象 
       * @param offset the offset of the object field within <code>obj</code>.
       *         <code>obj</code>中object型field的偏移量
       * @param expect the expected value of the field.
       *               希望field中存在的值
       * @param update the new value of the field if it equals <code>expect</code>.
       *               如果期望值expect与field的当前值相同,设置filed的值为这个新值
       * @return true if the field was changed.
       *              如果field的值被更改
       */
      public native boolean compareAndSwapObject(Object obj, long offset,
                                                 Object expect, Object update);
    

      所以,加锁流程就变成了如下图的样子,

    加锁流程图.png

      根据这个流程,代码就变成了如下的样子:

    package com.example.myapplication;
    
    /**
     * @ClassName: MyOwnLock
     * @Description:
     * @Author: 海晨忆
     * @Date: 2021/7/15 11:07
     */
    public class MyOwnLock {
        private volatile int state;
        private Thread currentHolder;
    
        public void lock() {
            if (acquire()){
                return;
            }
            
        }
    
        public boolean acquire(){
            Thread currentThread = Thread.currentThread();
            int state = getState();
            if (state == 0) {
                if (compareAndSwapState(0,1)) {
                    setCurrentHolder(currentThread);
                }
                return true;
            }
            return false;
        }
    
        public void unLock() {
        }
    
        public int getState() {
            return state;
        }
    
        public void setState(int state) {
            this.state = state;
        }
    
        public Thread getCurrentHolder() {
            return currentHolder;
        }
    
        public void setCurrentHolder(Thread currentHolder) {
            this.currentHolder = currentHolder;
        }
    
    }
    

      加锁流程就完了吗?这才哪到哪,我们现在加锁就第一个拿到锁的线程开始用了,那后面没有拿到锁的线程怎么办呢?难道全部丢掉不管吗?

      当然不行。那怎么办呢?我们先用一个队列,把没有拿到锁的线程存起来,排好队,等第一个线程执行完了之后,释放锁的时候,再直接唤醒等待的线程即可;

      把阻塞的线程全放进队列(并且要线程安全,高并发的情况下迅速 入队,出队)中,当T1释放锁是,直接唤醒T2。队列选择用ConcurrentLinkedQueue<Thread>(基于CAS算法,保证入队,出队安全)

      那么,要怎么做呢?要怎么等待呢?最简单的就是写一个死循环,一直循环去判断锁有没有用完。但是,这样会一直占用CPU,消耗性能。

    1. 用sleep?Thread.sleep可能造成等待时间过长,你没法知道休眠的时间,如果,线程1执行完只需要200毫秒,你这里休眠了1000毫秒,那就浪费了800毫秒了。
    2. 那用Thread.yield()可以了吧?答案是不行,这个线程让步,虽然把cpu的时间片让出去给其他线程用了,但是,最后,我们需要唤醒的时候,怎么办呢?

      我们最后唤醒,是要指定唤醒哪一个线程,这里我们还是用到刚才Unsafe里面的两个方法,park和unPark。park阻塞线程,让出cpu的使用权,unPark解除阻塞,唤醒某一个线程。

      这里我们用它的包装类LockSupport。所以,加锁代码就变成这样子了:

        private ConcurrentLinkedQueue<Thread> waiterQueue = new ConcurrentLinkedQueue<>();
    
        public void lock() {
            if (acquire()) {
                return;
            }
            Thread currentThread = Thread.currentThread();
            waiterQueue.add(currentThread);
            for (; ; ) {
                if (currentThread != waiterQueue.peek() && acquire()) {
                    waiterQueue.poll();
                    return;
                }
                LockSupport.park(currentThread);
            }
        }
    

      获取锁的方法也要做相应的调整,如下:

        public boolean acquire() {
            Thread currentThread = Thread.currentThread();
            int state = getState();
            if (state == 0) {
                boolean temp = waiterQueue.size() == 0 || currentThread == waiterQueue.peek();
                if (temp && compareAndSwapState(0, 1)) {
                    setCurrentHolder(currentThread);
                }
                return true;
            }
            return false;
        }
    

      解锁就比较简单了

        public void unLock() {
            if (Thread.currentThread() != currentHolder) {
                throw new RuntimeException("出错啦");
            }
            int state = getState();
            if (compareAndSwapState(state, 0)) {
                setCurrentHolder(null);
                Thread firstThread = waiterQueue.peek();
                if (firstThread != null) {
                    LockSupport.unpark(firstThread);
                }
            }
        }
    

      判断当前线程是不是持有锁的线程,如果不是,那就抛异常
    如果是,那就获取状态state,通过cas算法,置换为0的状态。

      最后运行完,结果如下:

    最终运行结果图.png

    总结:上面我说的简单的线程同步器,只是AQS的一个主要的思想,CAS算法,计数器,线程队列。还有线程中断,独占模式,共享模式,公平锁,不公平锁等等。看完我这个,然后再去看源码应该会更容易懂。

    相关文章

      网友评论

          本文标题:JAVA并发——线程同步器AQS

          本文链接:https://www.haomeiwen.com/subject/wnlkmltx.html