美文网首页
6.AQS(抽象同步帮助器)

6.AQS(抽象同步帮助器)

作者: Junma_c631 | 来源:发表于2020-09-27 15:29 被阅读0次

加锁的本质是什么?
加锁的本质是为了在多线程情况下串行的访问临界资源。

一、JAVA中的等待唤醒机制

1.1 基于monitor机制的 object。wait/notify

package executors;
/*
*基于object的wait和notify机制特点:
1.lock.wait() 当前线程会释放锁,所以代码中notifyMethod才能获取锁成功,才会通知成功
2.如果notifyMethod先执行完毕,lock.wait()后执行就会进入无限等待。
3.和sleep的区别,sleep线程等待不会释放锁资源。sleep基于线程的 wait是基于object的monitor机制,
wait如果不设定超时时间需要被唤醒。
*/
public class WaitTest {
    public static void main(String[] args) {
      WaitTest test = new WaitTest();
      Object lock = new Object();
      Thread t1 = new Thread("等待线程"){
          @Override
          public void run() {
              test.waitMethod(lock);
          }
      };
      t1.start();
      Thread t2 = new Thread("唤醒线程"){
            @Override
            public void run() {
                test.notifyMethod(lock);
            }
      };
      t2.start();
    }

 public void waitMethod(Object lock){
        synchronized (lock){
            try {
                System.out.println(Thread.currentThread().getName()+"begin wait()...");
                lock.wait();
                System.out.println(Thread.currentThread().getName()+"end wait()...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }


        }
 }
    public void notifyMethod(Object lock){
        synchronized (lock){

            try {
                System.out.println(Thread.currentThread().getName()+"begin notify()...");
                Thread.sleep(5000L);
                lock.notify();
                System.out.println(Thread.currentThread().getName()+"end notify()...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}
运行结果:
等待线程begin wait()...
唤醒线程begin notify()...
唤醒线程end notify()...
等待线程end wait()...

1.2基于线程的LookSupport.park/unpark

可以先unpark发放许可,再执行park()线程不再等待直接往下执行;
也可以先park()线程等待,再发放许可,等待的线程往下执行
unpark park无论谁先执行都不会存在无限等待的情况

package executors;
import java.util.concurrent.locks.LockSupport;
public class LockSupportTest {
    public static void main(String[] args) throws Exception {
        LockSupportTest test = new LockSupportTest();
        Thread t1 = new Thread("等待线程"){
            @Override
            public void run() {
                test.waitMethod();
            }
        };
        t1.start();
        Thread t2 = new Thread("唤醒线程"){
            @Override
            public void run() {
                test.notifyMethod();
            }
        };
        t2.start();
        t2.join();
        //给t1线程发放许可,终止t1的等待状态
        LockSupport.unpark(t1);
    }

    public void waitMethod(){
        System.out.println(Thread.currentThread().getName()+"begin wait()...");
        //park是停止的意思,执行这行代码会让当前线程等待
        LockSupport.park();
        System.out.println(Thread.currentThread().getName()+"end wait()...");
    }
    public void notifyMethod(){
        try {
            System.out.println(Thread.currentThread().getName()+"begin notify()...");
            Thread.sleep(5000L);
            System.out.println(Thread.currentThread().getName()+"end notify()...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
执行结果:
等待线程begin wait()...
唤醒线程begin notify()...
唤醒线程end notify()...
等待线程end wait()...

二、java中的线程中断

2.1 java中如何优雅的停止一个线程?

Thread.stop--->立即停止,可能会有未释放的锁,导致死锁。。等等问题。
【interrupt()方法】
java中的中断机制是一种协作机制,也就是说通过中断并不能直接终止另一个线程,而需要被中断的线程自己处理中断。

t1.interrupt()方法只是给线程发送一个中断信号,儿不会中断线程,是否中断线程需要自己进行控制
package executors;

public class ThreadInterruptTest {
    private static int i =0;
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(){
            @Override
            public synchronized void run() {
                while(true){
                    i++;
                    System.out.println(i);
                    //判断线程是否接收到中断信号,执行该方法后会清除中断信号
                    //也就是说该判断只会触发一次
                    if(Thread.interrupted()){
                        System.out.println("==========");
                        //线程不会中断执行,需要自己控制。
                        break;
                    }
                }
            }
        };
        //启动线程
        t1.start();
        //中断线程
        Thread.sleep(10L);
        t1.interrupt();
    }
}
执行结果:
1
2
...
...
258
==========

线程终端API使用

t1.interrupt():将线程的中断标志位设置为true,不会中断线程
t1.isInterrupted:判断当前线程的中断标志位是否是true
Thread.interrupted():判断当前线程的中断标志位是否为true,并清除中断标志位,设置为false
public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();

        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag 仅设置中断标志为true
                b.interrupt(this);
                return;
            }
        }
        interrupt0();
    }
public static boolean interrupted() {
   //获取当前中断标志,并清除     
   return currentThread().isInterrupted(true);
}
public boolean isInterrupted() {
         //获取当前中断标志,不清除标志位
        return isInterrupted(false);
 }

sleep期间可以感知到终端信号,感知后并清除掉中断标志位,
wait也能感知中断标志位感知后并清除掉中断标志位
LockSupport.park()也能感知到终端,感知后不清除中断标志位

package executors;

public class ThreadInterruptTest {
    private static int i =0;
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(){
            @Override
            public synchronized void run() {
                while(true){
                    i++;
                    System.out.println(i);
                    //判断线程是否接收到中断信号,执行该方法后会清除中断信号
                    //也就是说该判断只会触发一次
                    try {
                        wait(5000);//wait一定要跟锁一块配合使用
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if(Thread.interrupted()){
                        System.out.println("==========");
                        //线程不会中断执行,需要自己控制。
                        break;
                    }
                }
            }
        };
        //启动线程
        t1.start();
        //中断线程
        Thread.sleep(10L);
        t1.interrupt();
    }
}
package executors;

import java.util.concurrent.locks.LockSupport;

public class ThreadInterruptTest {
    private static int i =0;
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(){
            @Override
            public synchronized void run() {
                while(true){
                    i++;
                    System.out.println(i);
                    //判断线程是否接收到中断信号,执行该方法后会清除中断信号
                    //也就是说该判断只会触发一次
                    try {
                        LockSupport.park();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    if(Thread.interrupted()){
                        System.out.println("==========");
                        //线程不会中断执行,需要自己控制。
                        break;
                    }
                }
            }
        };
        //启动线程
        t1.start();
        //中断线程
        Thread.sleep(10L);
        //t1.interrupt();
    }
}

三、CAS比较与交换

CAS可以看成是乐观锁的一种实现方式,java原子类中的递增操作就通过CAS自旋实现的。
CAS全称CompareAndSwap(比较与交换),是一种无锁算法。在不使用锁的情况下即线程没有被阻塞的情况下实现多线程之间的变量同步。
LOCK_IF_MP(%4)"CMPXCHGL%1,(%3)" lock cmpxchgl
unsafe.compareAndSwapInt(this,valueOffset,expect,update)
AQS中主要是来操作一个标志状态值 state

上面的方法有几个很重要的参数
this:需要修改的对象
valueoffset:value变量的内存偏移量地址
expect:期望更新的值
update:要更新的最新的值
缺点:
只能保证对一个变量的原子性操作
长时间自旋会给CPU带来压力
ABA问题

原子操作类的例子

package executors;

import java.util.concurrent.atomic.AtomicInteger;

public class AutomicIntegerTest {
    private static AtomicInteger atomicInteger= new AtomicInteger(0);
    public static void main(String[] args) throws InterruptedException {
        for(int i=0;i<10;i++){
          Thread thread = new Thread(()->{
              for(int j=0;j<10000;j++){
                  atomicInteger.getAndIncrement();
              }
          });
          thread.start();
        }
        Thread.sleep(20000l);
        System.out.println(atomicInteger.get());
    }
}
运行结果始终是:100000
 public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
  }
public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
             //根据偏移量获取内存中最新的值
            //var1需要获取的对象
           //var2:var1在内存中的偏移量
            var5 = this.getIntVolatile(var1, var2);
           //var1需要获取的对象
           //var2:var1在内存中的偏移量
          //var5:当前值
          //var5+var4:需要设置的值
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
  }

解决CAS ABA问题 加版本号的控制

public class ABATest {
    public static void main(String[] args) {
        String str1 = "aaa";
        String str2 = "bbb";
        //str1:值
        //1:版本号
        AtomicStampedReference<String> reference = new AtomicStampedReference<String>(str1,1);
        reference.compareAndSet(str1,str2,reference.getStamp(),reference.getStamp()+1);
        System.out.println("reference.getReference() = " + reference.getReference());

        boolean b = reference.attemptStamp(str2, reference.getStamp() + 1);
        System.out.println("b: "+b);
        System.out.println("reference.getStamp() = "+reference.getStamp());
        boolean c = reference.weakCompareAndSet(str2,"ccc",4, reference.getStamp()+1);
        System.out.println("reference.getReference() = "+reference.getReference());
        System.out.println("c = " + c);
    }
}

四、AQS(AbstractQueuedSynchronizer 抽象同步帮助器)

public abstract class AbstractQueuedSynchronizer  extends AbstractOwnableSynchronizer 
 implements java.io.Serializable {
//锁的状态,state=0代表没有线程占用锁 
private volatile int state;
 protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
 }

}

4.1ReentrantLock之非公平锁

/*
lock接口定义加锁尝试加锁的书写规范
*/
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;

public interface Lock {
    //加锁
    void lock();
    //加锁 可中断
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}
public ReentrantLock() {
         //默认构造器创建的是非公平的锁
        sync = new NonfairSync();
}
public void lock() {
    //调用非公平锁同步器的lock方法    
    sync.lock();
}
//非公平锁
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            //先通过cas设置锁状态(一次尝试加锁,相当于插队,所以是非公平锁)
            if (compareAndSetState(0, 1))
                //加锁成功后设置占用锁的线程是当前线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                //第一次尝试加锁不成功的话,放入队列排队
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
abstract static class Sync extends AbstractQueuedSynchronizer

AQS中属性解析

public abstract class AbstractQueuedSynchronizer  extends AbstractOwnableSynchronizer 
 implements java.io.Serializable {

//同步帮助器中除了维护了状态还维护了node双向链表,这个就是维护的线程队列
//头节点
 private transient volatile Node head;
//尾节点
 private transient volatile Node tail;
//锁的状态,state=0代表没有线程占用锁 
private volatile int state;
 protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
 }
static final class Node {
        //共享模式
        static final Node SHARED = new Node();
        //独占模式
        static final Node EXCLUSIVE = null;
        //waitStatus的值 
        //被中断的线程(无效的线程)
        static final int CANCELLED =  1;
        //可以被唤醒的线程
        static final int SIGNAL    = -1;
        //
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        //等待状态
        volatile int waitStatus;
        //前一个节点
        volatile Node prev;
        //下一个节点
        volatile Node next;
        //该节点绑定的线程
        volatile Thread thread;
        Node nextWaiter;
}
}

通过一个例子来分析加锁过程

package executors;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockTest {
private static  int sum=0;
private static ReentrantLock lock= new ReentrantLock();

    public static void main(String[] args) throws InterruptedException {
        for(int i=0;i<3;i++){
            Thread thread = new Thread(()->{
                lock.lock();
                try{
                    for(int j=0;j<10000;j++){
                        sum ++;
                    }
                }finally {
                    //释放锁
                    lock.unlock();
                }
            });
            thread.start();
        }
        Thread.sleep(10000L);
        System.out.println(sum);
    }
}
image.png

thread1的情况分析

 public class ReentrantLock implements Lock, java.io.Serializable {
    ...
    static final class NonfairSync extends Sync{
     ...
     final void lock() {
         if (compareAndSetState(0, 1))
               setExclusiveOwnerThread(Thread.currentThread());
          else
            //thread0还未释放锁的情况下,thread1走这个逻辑入队
            acquire(1);
     }
    }
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    ...
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
}
public class ReentrantLock implements Lock, java.io.Serializable {
    ...
    static final class NonfairSync extends Sync{
     ...
     protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
     }
     
    }
}
public class ReentrantLock implements Lock, java.io.Serializable {
    ...
    abstract static class Sync extends AbstractQueuedSynchronizer {
       final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            //判断状态是否是0,如果是0,通过CAS尝试加锁
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //判断独占线程是否是当前线程,如果是当前线程可以直接进入线程
                       // 从这个判断可以看出,锁是可以重入的
            else if (current == getExclusiveOwnerThread()) {
                //state在原来的基础上加1
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            //thread1尝试加锁失败
            return false;
        }
    
    }
}

tryAcquire(arg)失败后添加等待者addWaiter


image.png

acquireQueued线程挂起逻辑


image.png
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    ...
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && //1.尝试加锁失败
            acquireQueued(//3.把当前线程挂起
                 addWaiter(Node.EXCLUSIVE)//2.把当前线程加入等待队列
                 , arg
                 )
                 )
            selfInterrupt();
    }
    /*
    Node.EXCLUSIVE=null 也就是mode=null
    addWaiter:添加等待队列里即:入队
    */
    private Node addWaiter(Node mode) {
       
         /*
         当前还没有队列,所以这段代码先构建队列
        */
        Node node = new Node(Thread.currentThread(), mode);
        /*
        Node(Thread thread, Node mode) { 
            this.nextWaiter = mode;
            this.thread = thread;
        }
        */
        // Try the fast path of enq; backup to full enq on failure
        //tail:尾结点 pre:当前节点的上一个节点
        //thread1进来时,tail=null,pre=null
        Node pred = tail;
        //thread1不走这段逻辑
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //thread1走这里
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
        for (;;) {
            //tail=null
            Node t = tail;
            if (t == null) { // Must initialize
                 //通过cas设置head=new Node()
                if (compareAndSetHead(new Node()))
                    //尾结点指向head节点
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取当前节点的前驱节点,thread1其实拿到的就是head节点
                final Node p = node.predecessor();
                //如果当前节点的前驱节点是head节点,就尝试获取一次锁
                //如果获取锁成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //thread1不满足上面的逻辑,会走下面的分支进行阻塞当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //第一次进来thread1的前节点的waitStatus=0
        //第二次进来thread1的前节点的waitStatus=-1 这时返回true,就会进到parkAndCheckInterrupt()这个逻辑里面
        if (ws == Node.SIGNAL)
            return true;
        //[前节点]线程无效的情况下,把无效节点剔除即:把当前节点的前节点设置成,当前节点的前节点的前节点
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //重新设置前节点的next节点为当前节点(双向链表)
            pred.next = node;
        } else {
            //thread1会走这条分支,设置当前节点的前节点的waitStatus=-1,即:可以被唤醒的状态
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
      }
      private final boolean parkAndCheckInterrupt() {
        //阻塞thread1线程,使其进行等待状态
        LockSupport.park(this);
        //返回当前线程的中断状态true或者false
        return Thread.interrupted();
    }
}

thread2情况大致和thread1情况一致可以参考thread1

4.1.2 thread0锁释放以及唤醒thread1逻辑

image.png
public class ReentrantLock implements Lock, java.io.Serializable {
    ...
    public void unlock() {
        sync.release(1);
    }
    abstract static class Sync extends AbstractQueuedSynchronizer {
      //尝试释放锁
      protected final boolean tryRelease(int releases) {
            //状态-1
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                //把当前独占的线程设置为null
                setExclusiveOwnerThread(null);
            }
            //释放锁
            setState(c);
            return free;
      }
    }
}


public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    ...
    public final boolean release(int arg) {
        if (tryRelease(arg)) {//尝试释放锁成功
            //获取头结点
            Node h = head;
            if (h != null && h.waitStatus != 0)//头结点不为空并且头结点的waitStatus!=0
                unparkSuccessor(h);//找到头结点的next节点,唤醒next节点的线程即:thread1线程被唤醒
            return true;
        }
        return false;
    }
    private void unparkSuccessor(Node node) {
        //
        int ws = node.waitStatus;
        //头结点线程先设定位0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        //找到头结点的下一个节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //唤醒下一个节点即:唤醒thread1
        if (s != null)
            LockSupport.unpark(s.thread);
    }
}

thread1被唤醒后的出队逻辑

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    ...
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取当前node的前驱节点此时肯定是head节点了
                final Node p = node.predecessor();
                //尝试加锁,加锁成功,后出队逻辑
                if (p == head && tryAcquire(arg)) {//1.尝试加锁即:设置state=0,设置独占线程为当前线程即:thread1
                    //2.设置当前节点为head节点,把原来的head节点剔除
                    setHead(node);
                    //3.把原有head节点的next指针清理掉,原head的节点清理干净。
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//在这里被唤醒后再次进入循环体尝试加锁tryAcquire(arg)
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
     private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
}

AQS具备的特性:
阻塞等待队列(条件队列、同步队列)
共享/独占
公平/非公平
可重入
允许中断

相关文章

网友评论

      本文标题:6.AQS(抽象同步帮助器)

      本文链接:https://www.haomeiwen.com/subject/cdgxuktx.html