美文网首页并发
java并发(五)Lock以及AbstractQueuedSyn

java并发(五)Lock以及AbstractQueuedSyn

作者: 黄金矿工00七 | 来源:发表于2018-06-25 19:07 被阅读141次

在jdk1.5之前,java中锁功能的实现只能依靠synchronized来实现,在这之后,并发包中新增了Lock接口用来支持新的锁功能。
我们先来看一下jdk8中对Lock的描述,相对于synchronized关键字而言,能够获得更多对锁的操作,也更加灵活,可以与Condition联合使用以应对不同的场景,但是同时也增加了并发编程的复杂度。
我们首先来看一下,与synchronized的区别,文档中是这样描述的:

  • synchronized
    使用synchronized关键字的时候,加锁与释放锁总是以 block-structured 的方式,也就是我们必须对整个代码块加锁或者释放锁(当然这隐式的,由jvm控制的),当获取到多个锁时,释放的顺序必须是相反的,而且所有已经获得的锁的释放必须是在加锁的语义范围内。我们举几个例子来看一下:
public class Test {

  static final Test object = new Test();
  static final Test object1 = new Test();

  public static void main(String[] args) throws InterruptedException {
    Test o = new Test();
    Thread t1 = new Thread(new Runnable() {
      @Override
      public void run() {
        o.test();
      }
    });
    Thread t2 = new Thread(new Runnable() {
      @Override
      public void run() {
        o.test2();
      }
    });
    t1.start();
    //先启动线程1
    Thread.sleep(100);
    t2.start();

  }

  public void test() {

    synchronized (object) {
      //进入同步快,获取锁
      try {
        System.out.println(Thread.currentThread().getId() + "获得锁0");
        synchronized (object1) {
          System.out.println(Thread.currentThread().getId() + "获得锁1");
          //模拟任务
          Thread.sleep(3000);
          System.out.println(Thread.currentThread().getId() + "释放锁1");
        }
        //目的是为了能够看出先释放锁1
        Thread.sleep(3000);
        //当前同步快结束,释放锁
        System.out.println(Thread.currentThread().getId() + "释放锁0");
      } catch (InterruptedException e) {

      }
    }
  }

  public void test2() {
    synchronized (object1) {
      System.out.println("别的线程释放锁1,我获取锁1");
    }
  }
}
执行结果:13获得锁0
13获得锁1
13释放锁1
别的线程释放锁1,我获取锁1
13释放锁0

大家可以自己运行上面的例子,通过上面的程序我们很容易就可以理解synchronized的加锁释放锁的顺序以及范围。
相比于synchronized,Lock接口提供的方法更加灵活,不是必须的块状加锁,你可以自由的控制加锁顺序。但是自由也带来了很多问题,lock锁不会自动释放,需要注意的是Lock实现提供的锁当中,加锁必须对应释放锁,也就是说加一次,必须释放一次,尤其是在不同作用域是,要确保是否加锁,必要的时候释放锁,但是顺序并不做要求;
一般我们遵循下面的语法:不在try中获取锁是因为获取锁时发生异常会导致锁被释放掉

 * Lock l = ...;
 * l.lock();
 * try {
 *   // access the resource protected by this lock
 * } finally {
 *   l.unlock();
 * }}

下面我们看一下Lock接口的方法

  • Lock接口
   /**获取锁,调用该方法当前线程将会获取锁,获得锁后返回(阻塞)
    */
   void lock();
   /**可中断的获得锁,在获取锁的过程中可以响应中断
    */
   void lockInterruptibly() throws InterruptedException;
   /**非阻塞的获取锁,调用后立即返回,获得返回true,否则返回false
    */
    boolean tryLock();
   /**非阻塞的超时获取锁,调用后在以下三种情况返回,获得返回true,否则返回false
    *在超时时间内获得锁
    *超时时间被线程被中断
    *超时时间结束
    */
   boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
   /**释放锁
    */
   void unlock();
   /**获取通知组件,通俗的说给相当于wait和notify,只有获得锁后才可以调用(wait同样会释放锁)
    */
   Condition newCondition();
AbstractQueuedSynchronizer

在说AbstractQueuedSynchronizer之前,我们先来下AbstractOwnableSynchronizer,这是AQS的基类,在java源码中,对他的定义是这样的:一个被线程独占的同步器,也就是通俗意义上的排他锁,但是这个类并不具有锁的功能,他只是提供了创建这些锁或者同步器的基础(也就是当前独占锁的线程),我们在实现一个同步器的时候可以使用这些信息来实现对线程管理相关的功能。
他提供了两个基础方法

  • setExclusiveOwnerThread(Thread thread)// 设置当前独占锁的线程
  • Thread getExclusiveOwnerThread()//获取当前独占锁的线程

下面我们看一下锁的另外一个重要组件AbstractQueuedSynchronizer,AbstractQueuedSynchronizer是java并发包中很重要的队列同步器,他为ReentrantLock、semaphore、countdownLatch、以及线程池中worker等依赖等待队列的同步工具和锁提供了基础框架支持。并发包中的锁(在其他同步工具中没有实现Lock接口而已)基本都是通过实现Lock接口并且内部聚合一个或多个AQS的子类来实现线程同步的,可以这样理解二者之间的关系:锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。
我们来看一张UML图,了解一下锁的实现

并发包的锁
AbstractQueuedSynchronizer在内部维护了一个volatile int state用来表示同步状态和一个FIFO双向队列(这个队列并不是容器化的,只是通过保存头指针和尾指针来维护虚拟的链表)。
子类通过继承AQS并且实现它的抽象方法来实现同步功能。同步功能的基础建立在对同步状态的管理上,AQS提供了三个方法来管理同步状态:
  • getState()
  • setState()//一般用来当前线程已经获得共享资源时,+1(重入)
  • compareAndSetState()//一般用来当前线程竞争共享资源时
    state在不同的同步器实现中意义是不同的,例如在ReentrantLock中state初始化为0,每次同一线程获得锁后,state+1,其他线程想要获得锁,只能等state为0。在CountDownLatch中则是一个计数器,每次调用release方法,计数器-1,线程阻塞,直到计数值为0,解除所有线程阻塞;在semaphore中,则表示共享资源的数量(或者说许可的数量),每次调用acquire则许可-1,调用release许可+1,其余的实现可以参考java源码。
    同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来
    供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态。
  • AQS定义两种资源共享方式:
    • Exclusive(独占,如ReentrantLock)
    • Share(共享,如Semaphore、CountDownLatch、ReentrantReadWriteLock)
  • 自定义同步器实现时主要实现以下几种方法:
    • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
    • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
    • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
    • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    • tryReleaseShared(int):共享方式。尝试释放资源,成功则返回true,失败则返回false。
      在自定义同步器实现上述方法之后,只需调用AQS提供的模板方法即可。如下图:


      AQS模板方法
下面我们看一下AQS是如何实现同步的:
  • 同步队列
    同步队列是AQS内部所依赖的管理同步状态FIFO双向队列。
    我们来看一下队列的实现
    AQS定义了了一个内部Node类来实现队列
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
//等待队列中线程当前的状态
volatile int waitStatus;//
volatile Node prev;//
volatile Node next;//
volatile Thread thread;//获取同步状态的线程
Node nextWaiter;//队列中的

waitStatus有四种:
static final int CANCELLED = 1;//在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待
static final int SIGNAL = -1;//后继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行。
static final int CONDITION = -2;//节点在等待队列中,节点的线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点会从等待队列中转移到同步队列中,加入到同步状态的获取中
static final int PROPAGATE = -3;//表示下一次共享式同步状态获取将会被无条件地传播下去

同步队列基本结构
我们来看一下同步队列是如何构造的,也就是同步状态的获取与释放
在独占模式下,
      //tryAcquire()是由实现类重写的,在重入锁中在详说
        public final void acquire(int arg) { 
         if (!tryAcquire(arg) &&  
         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  
         selfInterrupt();    
        }      

当前线程尝试直接去获取资源,如果成功则直接返回,当线程获取同步状态失败时,同步器会将当前线程以及waitStatus等信息构造成一个Node节点,并将其加入同步队列的尾部(使用CAS),同时会阻塞当前线程,当同步状态释放时,会唤醒队列中的头结点,让其再次尝试获取同步状态。

  • JDK8实现

      private Node addWaiter(Node mode) {
            //构造节点
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
              //获取当前尾指针
              Node pred = tail;
            //如果队列不为空
            if (pred != null) {
                //设置要入队节点的前节点为尾结点
                 node.prev = pred;
                 if (compareAndSetTail(pred, node)) {
                  //CAS设置新尾结点成功则返回新尾结点
                 pred.next = node;
                 return node;
           }
         }
        //若队列为空,则进行入队操作
        enq(node);
         return node;
      }  
          private Node enq(final Node node) {
              for (;;) {//自旋进行下面的cas入队操作
                   Node t = tail;
                      //再次判断队列是否为空
                   if (t == null) { // Must initialize
                        //初始化头指针和尾指针
                       if (compareAndSetHead(new Node()))
                       tail = head;
                       } else {//不为空,则跟上面同样的操作
                         node.prev = t;
                       if (compareAndSetTail(t, node)) {
                             t.next = node;
                             return t;
                        }
                    }
               }
           }
    

在加入同步队列之后,调用acquireQueued(Node node,int arg)方法,我们来看一下操作流程

 final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {//当前线程自旋尝试获取同步状态
                  //获取当前节点的前节点
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {//head.next为头结点
                    //如果当前节点是头结点并且当前节点获取同步状态成功,设置当前节点为头指针,移除掉旧的头指针
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;//
                    return interrupted;//响应中断
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //在获取失败之后检查该线程是否应该被阻塞(简单点说,就是我通知过我的前节点,你干完了记得叫醒我),这里使用的是LockSupport的park,后面再说
                    interrupted = true;//响应中断
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //waitStatus上面已经说明过,获取前一个节点的waitStatus
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * 前节点已经通知过当同步状态被释放的时候会唤醒它,所以他可以安全的阻塞
             */
            return true;
        if (ws > 0) {
            /*
             * 前节点任务已经被取消了,跳过前面被取消节点, 一直找到未被取消的,当前节点作为他的后继节点
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 为初始化的状态,表示我们我还没有通知前面的节点,你释放的时候叫醒我
             * 我们要再次确认一下,万一你释放了呢,通知完了我就可以睡觉了,前提是我没被中断打扰
             * 
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

说完了同步状态的获取,我们再来说一下同步状态的释放

独占式同步状态释放

不多说,上源码

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
          //如果成功释放掉
            Node h = head;
           //  当前同步队列不为空,并且头指针被初始化过了
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
 private void unparkSuccessor(Node node) {
        /*
         * 如果状态是负的,也就是说需要被唤醒
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
          //找到 头结点,就是需要被唤醒的阶段
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
          //如果waitStatus>0 ,也就是说被取消了,把这个节点移除
            s = null;
            //那就从尾部往前找,找到没被取消的,并且队列不会空
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //唤醒这个线程
            LockSupport.unpark(s.thread);
    }

相关文章

网友评论

    本文标题:java并发(五)Lock以及AbstractQueuedSyn

    本文链接:https://www.haomeiwen.com/subject/cmlyyftx.html