美文网首页
聊聊高并发(九)实现几种自旋锁(四)

聊聊高并发(九)实现几种自旋锁(四)

作者: wenming6688 | 来源:发表于2018-08-23 16:01 被阅读0次

    这篇看一下时限队列锁的一种实现方式。 java并发包中的Lock定义包含了时限锁的接口:

    public interface Lock {
     
        void lock();
     
        void lockInterruptibly() throws InterruptedException;
     
        boolean tryLock();
     
        boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
     
        void unlock();
     
        Condition newCondition();
    }
    

    tryLock就是实现锁的接口,它支持限时操作,支持中断操作。这两个特性很重要,可以防止死锁,也可以在死锁的情况下取消锁。

    因为这两个特性的需要,队列锁的节点需要支持“退出队列”的机制,也就是说当发生超时或者线程中断的情况下,线程能从队列中出队列,不影响其他节点继续等待。之前实现的几种队列锁都不支持退出机制,一旦发生队列中的线程长时间阻塞,那么后续所有的线程都会被动阻塞。

    我们看一种限时队列锁的实现,它有几个要点:

    1. 定义一个共享的AVAILABLE节点,当一个节点的preNode指向AVAILABLE时,表示这个节点获得锁

    2. QNode节点维护一个preNode引用,这个引用只有当获得锁时,会指向AVAILABLE,或者超时了会指向它的前一个节点,其他等待锁的时候都是Null,因为一旦一个节点超时了,需要让它的后续节点指向它的前驱节点,所以只有超时的时候会给preNode设置值(指向AVAILABLE节点除外)。

    3. 使用一个AtomicReference原子变量tail来形成一个虚拟的单向链表结构。tail的getAndSet操作会返回之前的节点的引用,相当于获得了前驱节点。当获得锁后,前驱节点引用就释放了,前驱节点就可以被GC回收

    4. 支持中断操作,Thread.isInterrupted()可以获得线程中断的信息,一旦获取中断信息,就抛出中断异常。需要注意的时,线程中断信息发出时,并不是要求线程马上中断,而是告知了线程要中断的信息,程序自己控制中断的地点。

    5. 由于线程只有一个ThreadLocal的myNode变量指向自己的节点,所以获取锁时,使用了每次new一个新的Node,并设置给线程的方式,避免unlock时对node的操作影响后续节点的状态,也可以使线程多次获得锁。这里可以考虑像CLHLock那样,维护两个ThreadLocal的引用,释放锁时把myNode的引用指向已经不使用的前驱节点,这样避免无谓的new操作。

    package com.zc.lock;
     
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicReference;
     
    /**
     * 时限队列锁,支持tryLock超时操作
     * QNode维护一个指针preNode指向前一个节点。当preNode == AVAILABLE表示已经释放锁。当preNode == null表示等待锁
     * tail维护一个虚拟链表,通过tail.getAndSet方法获得前一个节点,并在前一个节点自旋,当释放锁时前一个节点的preNode == AVAIABLE,自动通知后一个节点获取锁
     * 当一个节点超时或者被中断,那么它的前驱节点不为空。后续节点看到它的前驱节点不为空,并且不是AVAILABLE时,知道这个节点退出了,就会跳过它
     * 当节点获得锁,进入临界区后,它的前驱节点可以被回收
     * **/
    public class TimeoutLock implements TryLock{
        // 声明为静态变量,防止被临时回收
        private static final QNode AVAILABLE = new QNode();
        
        // 原子变量指向队尾
        private AtomicReference<QNode> tail;
     
        ThreadLocal<QNode> myNode;
        
        public TimeoutLock(){
            tail = new AtomicReference<QNode>(null);
            myNode = new ThreadLocal<QNode>(){
                protected QNode initialValue(){
                    return new QNode();
                }
            };
        }
        
        @Override
        public void lock() {
            // 和CLHLock不同,每次新建一个Node,并设置给线程,目的是支持同一个线程可以多次获得锁,而不影响链中其他节点的状态
            // CLHLock不需要每次新建Node是因为它使用了两个指针,一个指向前驱节点。而前驱节点释放后就可以回收了。
            // CLHLock每次释放锁时设置myNode为失效的前驱节点,也是为了支持同一个线程可以多次获取锁而不影响其他节点
            QNode node = new QNode();
            myNode.set(node);
            QNode pre = tail.getAndSet(node);
            if(pre != null){
                // 在前一个节点自旋,当前一个节点是AVAILABLE时,表示它获得锁
                while(pre.preNode != AVAILABLE){
                    
                }
            }
        }
     
        @Override
        public void unlock() {
            QNode node = myNode.get();
            // CAS操作,如果为true,表示是唯一节点,直接释放就行;否则把preNode指向AVAILABLE
            if(!tail.compareAndSet(node, null)){
                node.preNode = AVAILABLE;
            }
        }
        
        @Override
        //TimeUnit只支持毫秒
        public boolean trylock(long time, TimeUnit unit) throws InterruptedException {
            if(Thread.interrupted()){
                throw new InterruptedException();
            }
            boolean isInterrupted = false;
            long startTime = System.currentTimeMillis();
            long duration = TimeUnit.MILLISECONDS.convert(time, unit);
            // 注意:每次tryLock都要new新的Node,为了同一个线程可以多次获得锁。如果每个线程都使用同一个节点,会影响链中其他的节点
            QNode node = new QNode();
            myNode.set(node);
            // 尝试一次获取锁
            QNode pre = tail.getAndSet(node);
            // 第一个节点或者之前的节点都是已经释放了锁的节点, pre==AVAILABLE表示获得了锁
            if(pre == null || pre == AVAILABLE){
                return true;
            }
            // 在给定时间内对preNode自旋
            while((System.currentTimeMillis() - startTime < duration) && !isInterrupted){
                QNode predPreNode = pre.preNode;
                // 表示前一个节点已经释放了锁,设置了preNode域,否则preNode域为空
                if(predPreNode == AVAILABLE){
                    return true;
                }
                // 当prePreNode != null时,只有两种情况,就是它超时了,或者被中断了。
                // 跳过prePreNode不为空的节点,继续自旋它的下一个节点
                else if(predPreNode != null){
                    pre = predPreNode;
                }
                if(Thread.interrupted()){
                    isInterrupted = true;
                }
            }
            
            // 超时或者interrupted,都要设置node的前驱节点不为空
            node.preNode = pre;
            
            if(isInterrupted){
                throw new InterruptedException();
            }
            
            return false;
        }
        
        public static class QNode {
            volatile QNode preNode;
        }
        
        public String toString(){
            return "TimeoutLock";
        }
     
    }
    

    TimeoutLock具备所有CLHLock的特性,比如无饥饿,先来先服务的公平性,在多个共享变量上自旋,从而控制合理的缓存一致性流量等等,并且支持了限时操作和中断操作。

    使用限时锁时有固定的模板,防止锁被错误使用。

    Lock lock = ...;
    if (lock.tryLock()) {
         try {
         // manipulate protected state
         } finally {
            lock.unlock();
         }
    } else {
        // perform alternative actions
    }
    

    同样,我们之前验证锁正确性的测试用例同样对TimeoutLock有效,这里不重复帖代码了。

    相关文章

      网友评论

          本文标题:聊聊高并发(九)实现几种自旋锁(四)

          本文链接:https://www.haomeiwen.com/subject/vukmiftx.html