美文网首页
Java线程-Lock学习(五)

Java线程-Lock学习(五)

作者: 骑着乌龟去看海 | 来源:发表于2018-07-28 15:20 被阅读90次

    一、前言

      上一篇线程文章中,我们了解了通过synchronized关键字来实现线程同步,而本篇文章,我们来学习下Lock相关的内容。从Java 5之后,在java.util.concurrent.locks包下提供了另外一种方式来实现同步访问,那就是Lock。

    1 为什么要有Lock?
    1. 前面我们知道,使用synchronized关键字能够实现线程同步的功能,如果一个线程获取了对应的锁,那么相关线程便只能一直等待,直到获取线程锁的线程释放掉锁,而如果获取线程锁的线程由于IO或者其他各种原因阻塞了,那其他线程便只能等着了,毫无疑问,这将会极大的影响程序的执行效率。
    2. 再举个例子:当多个线程读取同一个文件时,正常来说,读操作和写操作会发生冲突现象,写操作和写操作会发生冲突现象,但是读操作和读操作不会发生冲突现象。如果使用synchronized来处理的话,那么当多个线程都只是进行读操作的话,就会出现问题,根据synchronized的特性,同一个时间段内,只能有一个线程进行读取,其他线程将会进入等待中。

      而上面这两个问题,都可以通过Lock来解决,Lock还可以知道线程有没有成功获取到锁,也就是Lock提供了比synchronized更多的功能。

    二. Lock体系

    1. Lock接口

      首先,Lock不是Java的关键字,Lock是一个接口,Lock和其相关的实现构成了Lock体系,并且Lock体系是Java 5.0 才引入的。我们先来看一下Lock接口中的方法列表,由于Lock中的每个方法都很重要,所以我们一会再单独介绍下每个方法:

    public interface Lock {
        void lock();
        void lockInterruptibly() throws InterruptedException;
        boolean tryLock();
        boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
        void unlock();
        Condition newCondition();
    }
    

    这里先说下,Lock锁的释放不同于synchronized,synchronized方法或代码块执行完成之后或发生异常之后,JVM会自动让线程释放锁,而Lock则必须需要用户去手动释放锁,如果没有主动释放锁,则可能会导致死锁现象,这点需要注意。

    下面我们来挨个看下这些方法的用途:

    1.1 lock方法

      lock方法用来获取锁,如果锁被其他线程获取,则该线程进行等待;由于Lock必须要手动去释放锁,所以使用Lock的通常做法是在try{} catch{} finally中进行,然后将锁的释放放到finally块中执行:

    Lock lock = ...;
    lock.lock();
    try{
        //处理任务
    }catch(Exception e){
    
    }finally{
        lock.unlock();   //释放锁
    }
    
    1.2 tryLock方法
    1. tryLock方法也是用于获取锁,该方法有返回值,返回值为true,说明获取锁成功,返回false则表示获取锁失败(比如锁被其他线程占用中),该方法无论如何都会立刻返回,在拿不到锁时不会一直等待;
    2. tryLock(long time, TimeUnit unit)方法和tryLock方法类似,区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false,而如果一开始拿到锁或者在等待期间内拿到了锁,则返回true;所以,一般通过tryLock来获取锁是这样的:
    3. 有的时候tryLock锁的这种获取方式也称为可定时,可轮询的,与lock方法相比,它具有更完善的错误恢复机制,可以有效避免死锁的发生。
    Lock lock = ...;
    if(lock.tryLock()) {
        try{
            //处理任务
        }catch(Exception ex){
    
        }finally{
            lock.unlock();   //释放锁
        }
    }else {
        //如果不能获取锁,则直接做其他事情
    }
    
    1.3 lockInterruptibly方法

      这个方法也是用来获取锁的,不过这个方法有些特殊。当通过这个方法去获取锁时,如果没有获取到,线程进入等待状态,但该线程能够响应中断,即中断线程的等待状态。也就使说,当两个线程同时通过lock.lockInterruptibly()想获取某个锁时,假若此时线程A获取到了锁,而线程B将进入等待,那么对线程B调用threadB.interrupt()方法能够中断线程B的等待过程。

    另外需要注意下:

    1. 该方法的操作略微复杂些,由于该方法会抛出InterruptedException异常,所以需要两个try块(而如果在操作的过程中向上抛出了InterruptedException,则使用一个标准的try-finally即可);
    2. 当一个线程已经获取到了锁之后,是不会被interrupt方法中断的,这个前面学习的时候已经讲过。
    3. 因此当通过lockInterruptibly()方法获取某个锁时,如果不能获取到,只有进行等待的情况下,是可以响应中断的。而用synchronized修饰的话,当一个线程处于等待某个锁的状态,是无法被中断的,只有一直等待下去;
    private static void test() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            // 执行逻辑
        } finally {
            lock.unlock();
        }
    }
    

    这里再说下,定时的tryLock方法同样能响应中断。所以如果需要一个定时的和可中断的获取操作时,可以使用tryLock方法。

    1.4 unlock方法

      这个方法就不多说了,用于释放锁,由于Lock必须要手动释放锁,所以调用该方法的时候,最好放到finally块中,这样无论程序是否正常执行结束都将释放锁。

    1.5 newCondition方法

    该方法用于创建一个Condition对象,至于Condition对象,后面会学习到,这里就暂且不介绍了。

    2. ReentrantLock类

      接下来,我们来看下Lock的实现类:ReentrantLock,意思是可重入锁。该类在实现了Lock接口中的方法之外,还额外提供了一些方法,我们先通过一些例子来了解一下方法的使用。

    2.1 lock方法测试
    public class ThreadTest  {
        public static void main(String[] args) {
            new Thread(() -> test(Thread.currentThread())).start();
            new Thread(() -> test(Thread.currentThread())).start();
        }
        private static void test(Thread thread) {
            Lock lock = new ReentrantLock();
            lock.lock();
            try {
                System.out.println("线程" + thread.getName() + "得到锁");
            } finally {
                System.out.println("线程"+ thread.getName() + "释放锁");
                lock.unlock();
            }
        }
    }
    

    首先,我们先看下上面的简单例子,打印结果(每次调用可能不同):

    线程Thread-0得到锁
    线程Thread-1得到锁
    线程Thread-1释放锁
    线程Thread-0释放锁
    

    可以看到,结果似乎和我们预想的不太一样,两个线程都得到了锁,这是由于lock变量的问题,因为lock是一个局部变量,所以两个线程都new 了一个lock,lock对象不同,那么自然通过lock得到的锁也是不同的锁,也不会发生冲突啥的。我们稍作改变:

    public class ThreadTest  {
        private static Lock lock = new ReentrantLock();
        public static void main(String[] args) {
            new Thread(() -> test(Thread.currentThread())).start();
            new Thread(() -> test(Thread.currentThread())).start();
        }
        private static void test(Thread thread) {
            lock.lock();
            try {
                System.out.println("线程" + thread.getName() + "得到锁");
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("线程"+ thread.getName() + "释放锁");
                lock.unlock();
            }
        }
    }
    

    这次我们可以debug调试下看下结果。

    2.2 tryLock方法测试

    我们对上面的test方法稍作调整,来简单看下tryLock的使用:

    private static void test(Thread thread) {
        if (lock.tryLock()) {
            try {
                System.out.println("线程" + thread.getName() + "得到锁");
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("线程"+ thread.getName() + "释放锁");
                lock.unlock();
            }
        } else {
            System.out.println("线程" + thread.getName() + "没有获取到锁,立即返回!");
        }
    }
    

    打印结果:

    线程Thread-0得到锁
    线程Thread-1没有获取到锁,立即返回!
    线程Thread-0释放锁
    

    其他相关方法就不多说了,我们来简单看下ReentrantLock的公平锁属性。

    2.3 ReentrantLock的公平性

    我们来看下ReentrantLock的源码:

    private final Sync sync;
    
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
    
        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    }
    
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
    
        final void lock() {
            acquire(1);
        }
    }
    

    从源码中我们可以看到,ReentrantLock定义了两个静态内部类,NonfairSync和FairSync,分别用来实现非公平锁及公平锁,另外,ReentrantLock定义了一个sync属性,用来表示该Lock的公平性,可以通过isFair方法来判断该Lock是否是公平锁:

    public final boolean isFair() {
        return sync instanceof FairSync;
    }
    

    而要指定Lock是否是公平锁,我们可以通过其中的一个构造方法来实现:

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    

    而默认的构造方法,构造的是非公平锁。另外,ReentrantLock还有一些方法:

    isFair()                  //判断锁是否是公平锁
    isLocked()                //判断锁是否被任何线程获取了
    isHeldByCurrentThread()   //判断锁是否被当前线程获取了
    hasQueuedThreads()        //判断是否有线程在等待该锁
    

    另外还有一些方法是用于监控的方法,这里暂不多说了。

    我们来看下java.util.concurrent.locks包中其他需要了解的接口或实现。

    3. ReadWriteLock及ReentrantReadWriteLock

      ReadWriteLock 是个接口,表示读写锁,ReentrantReadWriteLock则是该接口的实现。ReadWriteLock接口只有两个方法,分别是获取读锁和获取写锁,也就是说将文件的读写操作分开,分成2个锁来分配给线程,从而使多个线程可以同时进行读操作:

    public interface ReadWriteLock {
        Lock readLock();
        Lock writeLock();
    }
    

    我们先来看下通过ReadWriteLock实现多个线程同时进行读操作:

    public class ThreadTest {
        private static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
        public static void main(String[] args) {
            new Thread(() -> test(Thread.currentThread())).start();
            new Thread(() -> test(Thread.currentThread())).start();
        }
    
        private static void test(Thread thread) {
            rwl.readLock().lock();
            try {
                for (int i = 0; i < 10; i++) {
                    System.out.println("线程" + thread.getName() + "正在进行读操作");
                }
                System.out.println("线程" + thread.getName() + "读操作完毕");
            } finally {
                rwl.readLock().unlock();
            }
        }
    }
    

    来看下结果,为了篇幅,省略了一些打印结果:

    线程Thread-0正在进行读操作
    线程Thread-0正在进行读操作
    线程Thread-1正在进行读操作
    线程Thread-1正在进行读操作
    线程Thread-0正在进行读操作
    线程Thread-0正在进行读操作
    线程Thread-1读操作完毕
    线程Thread-0正在进行读操作
    线程Thread-0读操作完毕
    

    可以看到,多个线程同时进行了读操作。不过需要注意的是:

    1. 除了多个线程可以同时获取读锁之外,其他情况,多个线程都将只有一个线程能获取到锁,其他线程则进入等待,比如一个线程已经获取了读锁,另一个线程想要获取写锁,则会进入等待,直到读锁释放。
    2. ReentrantReadWriteLock也支持公平性,可以设置该锁是公平锁或者非公平锁。

    而读锁与写锁之间是可以有多种交互方式的,比如:

    1. 释放优先:当一个写锁释放时,并且队列中同时存在读线程和写线程,那么应该优先选择读线程还是写线程,还是最先发出请求的线程?
    2. 读线程插队:如果锁是由读线程持有,但有写线程正在等待,那么新的读线程能否立即获得访问权,还是应该在写线程后面等待?如果运行读线程插队到写线程之前,那么有可能造成写线程发生饥饿问题;
    3. 重入性:读锁与写锁是否是可重入的?
    4. 降级:如果一个线程持有写锁,那么它能否在不释放该锁的情况下获得读锁?这就可能使写锁被降级为读锁,同时不允许其他写线程修改资源;
    5. 升级:读锁能否优于其他正在等待的读线程和写线程而升级为一个写锁?大多数的读写锁都不支持升级,因为如果没有显示的升级操作,那么很容易造成死锁(比如两个读线程试图同时升级为写锁,那么二者都不会释放读锁)。
    4. TimeUnit类

    这里我们再来简单说下TimeUnit类。首先,这是个枚举类,也是java.util.concurrent包下的,该类一般有两个作用:

    1. 作为时间的一个粒度,比如我们tryLock的第二个参数就是该类型,表示等待的时间单位是小时,分钟亦或是其他类型的;
    2. 用于线程休眠,可用来代替Thread.sleep,相比Thread.sleep来说,可读性更好,因为Thread.sleep的单位是毫秒,而该类的sleep方法则可以指定具体的时间单位;
    4.1 枚举元素及方法简介

    先看一下枚举的元素:

    TimeUnit.NANOSECONDS        时间单位是纳秒
    TimeUnit.MICROSECONDS       时间单位是微秒
    TimeUnit.MILLISECONDS       时间单位是毫秒
    TimeUnit.SECONDS            时间单位是秒
    TimeUnit.MINUTES            时间单位是分钟
    TimeUnit.HOURS              时间单位是小时
    TimeUnit.DAYS               时间单位是天
    

    再来简单看一下转换方法:

    public long toNanos(long d)               
    public long toMicros(long d)              
    public long toMillis(long d)  
    public long toSeconds(long d) 
    public long toMinutes(long d) 
    public long toHours(long d)   
    public long toDays(long d)    
    public long convert(long d, TimeUnit u) 
    

    这些方法见名知义,就不多说了,再来看一下使用方式:

    //将天转换为小时
    //output: 24
    System.out.println(TimeUnit.DAYS.toHours(1));
    //将小时转换为秒
    //output: 3600
    System.out.println(TimeUnit.HOURS.toSeconds(1));
    
    // 休眠5秒
    Thread.sleep(5 * 1000);
    TimeUnit.SECONDS.sleep( 5 );
    
    // 休眠5分钟
    Thread.sleep(5 * 60 * 1000);
    TimeUnit.MINUTES.sleep(5);
    
    4.2 TimeUnit的sleep方法

      查看下TimeUnit的sleep方法源码可以知道,该方法底层实际是通过Thread.sleep方法来实现的,最终将单位都转换为了毫秒:

    public void sleep(long timeout) throws InterruptedException {
        if (timeout > 0) {
            long ms = toMillis(timeout);
            int ns = excessNanos(timeout, ms);
            Thread.sleep(ms, ns);
        }
    }
    

    所以,如果使用到Thread.sleep的时候,我们可以优先使用TimeUnit的sleep方法。

    4.3 timedJoin 和 timedWait方法

    另外,TimeUnit中还有两个public方法,我们来简单了解下:

    public void timedWait(Object obj, long timeout)
            throws InterruptedException {
        if (timeout > 0) {
            long ms = toMillis(timeout);
            int ns = excessNanos(timeout, ms);
            obj.wait(ms, ns);
        }
    }
    
    public void timedJoin(Thread thread, long timeout)
            throws InterruptedException {
        if (timeout > 0) {
            long ms = toMillis(timeout);
            int ns = excessNanos(timeout, ms);
            thread.join(ms, ns);
        }
    }
    

    看源码就可以看出,timeWait是对Object.wait方法的一个封装,timedJoin则是对Thread.join方法的一个封装,因为这两个方法都支持类似于定时的操作,所以和sleep类似,封装了两个方法,同样,都可以指定相应的时间单位:

    TimeUnit.MINUTES.timedJoin(thread, 5);
    
    synchronized (lock) {
        TimeUnit.MINUTES.timedWait(lock, 5);
    }
    

    三、ReentrantLock源码

    1. ReentrantLock的公平锁的源码

      ReentrantLock 在Java中是一个普通的类,实现方式是基于AQS(AbstractQueuedSynchronizer)来实现的,而AQS是 Java 并发包里实现锁、同步的一个重要的基础框架。接下来我们来简单看下ReentrantLock中有关公平锁和非公平锁的实现源码:

    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer {...}
    

    首先,该类中有一个基础的锁对象Sync,并且该锁有两个实现类,公平锁(FairSync)和非公平锁(NonfairSync),我们首先来看下公平锁的实现:

    final void lock() {
        acquire(1);
    }
            
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    在公平锁的实现lock方法中,会先调用基础类AbstractQueuedSynchronizer的acquire方法,然后该方法中会调用尝试获取锁的方法tryAcquire:

    1. 这里第一步是先判断同步状态state的值是否是0,0表示目前没有其他线程获取锁,当前线程可以尝试获取到该锁;
    2. 而当前线程尝试之前,会先通过 hasQueuedPredecessors 该方法来判断AQS的队列中是否有其他线程,如果有则不会尝试获取锁,因为这时公平锁的情况;
    3. 而如果队列中没有线程,就通过CAS方式将AQS的同步状态state设置为1,也就是获取锁成功,然后通过setExclusiveOwnerThread 将当前线程置为获得锁的独占线程;
    4. 而如果c不等于0,其实也就是大于0,说明锁已经获取成功了,然后判断获取锁的线程是否是当前线程(getExclusiveOwnerThread),如果是当前线程,则将state 更新为state + 1(因为ReentrantLock支持可重入);

    而如果通过 tryAcquire方法获取锁失败,则先调用addWaiter将当前线程写入队列中,而写入之前需要将当前线程封装为一个Node对象,因为AQS 中的队列是由 Node 节点组成的双向链表实现的:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    

    写入队列之后,然后通过acquireQueued方法将当前线程挂起(最终是通过LockSupport的park方法),进入等待状态,等着被唤醒:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

      这里有一个循环自旋的操作,在挂起线程之前,先判断该线程前面的节点是否是head节点,如果是,再次去尝试获取锁,获取成功后,将该节点设置为head节点,然后返回;如果上述条件不成立,调用shouldParkAfterFailedAcquire方法判断是否应该把当前线程挂起,主要是通过节点的waitStatus字段来判断,如果需要,调用LockSupport的part方法挂起线程。

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    
    2. ReentrantLock的非公平锁的源码

    非公平锁(NonfairSync)与公平锁的区别主要在于获取锁,公平锁是一种顺序的方式,而非公平锁则是一种抢占式的方式,来看下源码:

    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    

    直接通过CAS来尝试获取锁,如果获取锁成功,然后将当前线程设置为获得锁的独占线程。如果获取不成功,同样会调用acquire方法,然后会调用tryAcquire方法再次尝试获取锁:

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    不过这里不需要判断队列中是否有其他线程,也就是没有hasQueuedPredecessors方法,而是直接尝试获取锁。最后,释放锁,公平锁和非公平锁的方式是一致的,主要就是减少state的值:

    public void unlock() {
        sync.release(1);
    }
    
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 唤醒被挂起的线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    

    释放锁的时候,先判断当前线程是否是获得锁的线程,由于是重入锁,所以需要将state的值减到0才认为完全释放锁,释放锁之后,通过unparkSuccessor方法来唤醒被挂起的线程。

    3. 源码总结
    1. ReentrantLock是通过AQS(AbstractQueuedSynchronizer)来实现的,AQS是Java并发包里一个重要的基础框架,可以实现各种形式的锁,这里只用到了独占锁;
    2. 在线程阻塞的时候,AQS本身会有自旋的操作,并非是获取不到锁就直接阻塞;
    3. 公平锁的实现是底层维护了一个基于双向链表的队列结构;

    源码参考了:ReentrantLock 实现原理

    四、总结

    1. 使用synchronized还是ReentrantLock?

    《Java并发编程实战》的作者在书中写道:

    1. 在synchronized无法满足需求的情况下,ReentrantLock可以作为一种高级工具。当需要一些高级功能时才应该使用ReentrantLock,这些功能包括:可定时,可轮询与可中断的获取锁操作,公平队列以及非块结构的锁。否则,还是应该优先使用synchronized;
    2. 未来更可能会一直提升synchronized而不是ReentrantLock的性能,因为synchronized是JVM的内置属性,它可以执行一些优化,比如线程封闭的锁对象消除优化等等。即使是性能方面,synchronized也不会逊色于ReentrantLock太多。

    本文参考自:
    《Java并发编程实战》
    海子-Java并发编程:Lock

    相关文章

      网友评论

          本文标题:Java线程-Lock学习(五)

          本文链接:https://www.haomeiwen.com/subject/mxpomftx.html