美文网首页
利用LockSupport实现互斥锁和共享锁

利用LockSupport实现互斥锁和共享锁

作者: LNAmp | 来源:发表于2016-12-07 18:24 被阅读245次

    前言

    首先说说LockSupport吧,它的作用是提供一组直接block或unblock线程的方法,其底层实现利用了Unsafe(前面文章有讲过Unsafe)。LockSupport是一个非常底层的API,我们利用其可以做很多事情,本文将利用LockSupport实现互斥锁和共享锁。

    Lock

    在JDK中已经提供了很多种锁的实现,原生的synchronized(优先推荐使用),juc中的ReentrantLock等,本文不纠结synchronized和ReentrantLock的实现,本文只从Lock的语义出发实现两种锁。

    Lock的语义

    juc中对于Lock接口的定义如下:

        void lock();
    
        void lockInterruptibly() throws InterruptedException;
    
        boolean tryLock();
    
        boolean tryLock(long var1, TimeUnit var3) throws InterruptedException;
    
        void unlock();
    
        Condition newCondition();
    
    • void lock():获取锁的语义,如果没有获取到锁一直阻塞当前线程(不响应中断interrupt)
    • void lockInterruptibly() throws InterruptedException; 获取锁,但是当前线程在阻塞期间可以响应中断(后面稍微会扯一下InterruptedException)
    • boolean tryLock(); 尝试获取锁,不阻塞;获取到锁返回true,没有获取到返回false
    • boolean tryLock(long var1, TimeUnit var3) throws InterruptedException; 尝试获取锁,并尝试阻塞等待一定时间,阻塞期间可以响应中断
    • void unlock(); 释放锁;
    • Condition newCondition();在锁上新建Condition

    以上的关于锁的语义稍微复杂了点,特别是相应中断部分和newCondition部分,所以这次实现上简化了Lock的语义如下:

        void lock();
    
        void unLock();
    
        boolean tryLock();
    
        boolean tryLock(long maxWaitInMills);
    

    基本功能和上面保持一致,但是都不响应中断

    分析锁的实现

    • Lock有可重入的语义,一个线程拥有锁之后再次调用lock应该完全没有任何问题,所以锁的实现中需要维护一个已经获取锁的线程队列;
    • Lock未成功需要阻塞当前线程,所以需要底层阻塞原语(LockSupport)等的支持,并且在有线程释放锁之后需要唤起阻塞线程进行锁的竞争,所以需要维护等待锁的线程队列
    • Lock需要维护当前锁的状态(是否可以被获取等)

    互斥锁

    public class MutexLock implements Lock {
    
    
        private volatile Thread threadOwnsTheLock;
    
        private final AtomicInteger state = new AtomicInteger(0);
    
        private final ConcurrentLinkedQueue<Thread> waitThreadsQueue = new ConcurrentLinkedQueue<Thread>();
    
    
        //一直等待
        public void lock() {
            tryLock(-1L);
        }
    
        //invoke all的语义,也可以做invokeNext
        public void unLock() {
            tryRelease(-1);
            threadOwnsTheLock = null;
            if (!waitThreadsQueue.isEmpty()) {
                for (Thread thread : waitThreadsQueue) {
                    LockSupport.unpark(thread);
                }
            }
        }
    
        public boolean tryLock() {
            if (threadOwnsTheLock != null && (threadOwnsTheLock == Thread.currentThread())) {
                return true;
            }
            if (tryAcquire(1)) {
                threadOwnsTheLock = Thread.currentThread();
                return true;
            }
    
            return false;
        }
    
        //没有实现interrupt的语义,不能打断
        public boolean tryLock(long maxWaitInMills) {
            Thread currentThread = Thread.currentThread();
            try {
                waitThreadsQueue.add(currentThread);
                if (maxWaitInMills > 0) {
                    boolean acquired = false;
                    long left = maxWaitInMills * 1000L * 1000L;
                    long cost = 0;
                    while (true) {
                        //需要判断一次interrupt
    
                        if (tryAcquire(1)) {
                            threadOwnsTheLock = currentThread;
                            acquired = true;
                            break;
                        }
    
                        left = left - cost;
                        long mark = System.nanoTime();
                        if (left <= 0) {
                            break;
                        }
                        LockSupport.parkNanos(left);
                        cost = mark - System.nanoTime();
                    }
                    return acquired;
                }else {
                    while (true) {
                        if (tryAcquire(1)) {
                            threadOwnsTheLock = currentThread;
                            break;
                        }
                        LockSupport.park();
                    }
                    return true;
                }
            } finally {
                waitThreadsQueue.remove(currentThread);
            }
    
        }
    
        protected boolean tryAcquire(int acquire) {
            return state.compareAndSet(0, 1);
        }
    
        protected void tryRelease(int release) {
            if (threadOwnsTheLock == null || (threadOwnsTheLock != Thread.currentThread())) {
                System.out.println("Wrong state, this thread don't own this lock.");
            }
            while (true) {
                if (state.compareAndSet(1, 0)) {
                    return;
                }
            }
        }
    }
    

    以上互斥锁使用了一个AtomicInteger,利用了CAS来维持锁的状态

    共享锁

    public class ShareLock implements Lock {
    
        private volatile Set<Thread> threadsOwnsLock = Sets.newConcurrentHashSet();
    
        private final AtomicInteger state;
    
        private final ConcurrentLinkedQueue<Thread> waitThreadsQueue = new ConcurrentLinkedQueue<Thread>();
    
        public ShareLock(int shareNum) {
            this.state = new AtomicInteger(shareNum);
        }
    
    
        //一直等待
        public void lock() {
            tryLock(-1L);
        }
    
        public void unLock() {
            tryRelease(-1);
            threadsOwnsLock.remove(Thread.currentThread());
            if (!waitThreadsQueue.isEmpty()) {
                for (Thread thread : waitThreadsQueue) {
                    LockSupport.unpark(thread);
                }
            }
        }
    
        public boolean tryLock() {
            if ( !(threadsOwnsLock.contains(Thread.currentThread()))) {
                return true;
            }
            if (tryAcquire(1)) {
                threadsOwnsLock.add(Thread.currentThread());
                return true;
            }
    
            return false;
        }
    
        public boolean tryLock(long maxWaitInMills) {
    
    
            Thread currentThread = Thread.currentThread();
            try {
                waitThreadsQueue.add(currentThread);
                if (maxWaitInMills > 0) {
                    boolean acquired = false;
                    long left = TimeUnit.MILLISECONDS.toNanos(maxWaitInMills);
                    long cost = 0;
                    while (true) {
                        if (tryAcquire(1)) {
                            threadsOwnsLock.add(Thread.currentThread());
                            acquired = true;
                            break;
                        }
    
                        left = left - cost;
                        long mark = System.nanoTime();
                        if (left <= 0) {
                            break;
                        }
                        LockSupport.parkNanos(left);
                        cost = mark - System.nanoTime(); //有可能是被唤醒重新去获取锁,没获取到还得继续等待剩下的时间(并不精确)
                    }
                    return acquired;
                }else {
                    while (true) {
                        if (tryAcquire(1)) {
                            threadsOwnsLock.add(Thread.currentThread());
                            break;
                        }
                        LockSupport.park();
                    }
                    return true;
                }
            } finally {
                waitThreadsQueue.remove(currentThread);
            }
    
        }
    
        protected boolean tryAcquire(int acquire) {
            if (state.getAndDecrement() > 0) {
                return true;
            } else {
                state.getAndIncrement();//恢复回来
                return false;
            }
        }
    
        protected void tryRelease(int release) {
            if (!(threadsOwnsLock.contains(Thread.currentThread()))) {
                System.out.println("Wrong state, this thread don't own this lock.");
            }
            state.getAndIncrement();
        }
    }
    

    总结

    以上利用了LockSupport来实现了互斥锁和共享锁,但是实现中并没有完成中断响应。后面应该会有文章单独说明关于InterruptedException的注意点。下篇文章将讲述如何利用LockSupport实现Future语义

    相关文章

      网友评论

          本文标题:利用LockSupport实现互斥锁和共享锁

          本文链接:https://www.haomeiwen.com/subject/xwqjmttx.html