美文网首页程序员
java锁(3)Lock接口及LockSupport工具详解

java锁(3)Lock接口及LockSupport工具详解

作者: 桥头放牛娃 | 来源:发表于2019-07-18 23:01 被阅读8次

    1、Lock接口

    1.1、Lock玉synchronized的不同

    Lock不是Java语言内置的,synchronized是Java语言的关键字,因此是内置特性。Lock是一个类,通过这个类可以实现同步访问;
    Lock和synchronized有一点非常大的不同,采用synchronized不需要用户去手动释放锁,当synchronized方法或者synchronized代码块执行完之后,系统会自动让线程释放对锁的占用;而Lock则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出现死锁现象。
    Lock在使用时需要显式地获取和释放锁,虽然其缺少隐式获取锁的便捷性,但却拥有了锁获取与释放的可操作性、可中断的获取锁及超时获取锁等多种synchronized关键字所不具备的同步特性。

    1.2、Lock接口实现

    public interface Lock {
    
        //获取锁,若锁不可用,则当前线程将会阻塞,直到获得锁
        void lock();
    
        //获取锁,当被中断或获取到锁才返回;
        //若锁不可用,则当前线程被阻塞,直到获取锁或被中断
        void lockInterruptibly() throws InterruptedException;
    
       //尝试获取锁,并立即返回;true:获取锁成功;false:获取锁失败
        boolean tryLock();
    
       //尝试在指定的超时时间获取锁,当获取到锁时返回true;当超时或被中断时返回false
        boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    
       //释放锁
        void unlock();
    
       //返回一个和锁绑定的条件队列;
       //在等待条件之前线程必须先获取当前锁,同时await()方法会原子地释放锁
       //并在返回之前重新获取到锁
        Condition newCondition();
    }
    

    2、LockSupport工具类

    LockSupport定义了一组的公共静态方法,这些方法提供了最基本的线程阻塞和唤醒功能,而LockSupport也成为构建同步组件的基础工具。LockSupport定义了一组以park开头的方法用来阻塞当前线程,以及unpark(Thread thread)方法来唤醒一个被阻塞的线程。

    2.1、LockSupport的基本特征

    重入性:
    LockSupport是非重入锁;
    面向线程锁:
    这是LockSupport很重要的一个特征,也是与synchronized,object,reentrantlock最大的不同,这样也就没有公平锁和非公平的区别,所以无法只是依靠LockSupport实现单例模式。同时面向线程锁的特征在一定程度上降低代码的耦合度。
    锁park与解锁unpark顺序可颠倒性:
    这个特征虽然是Java锁中比较独特是特征,其实这个特征也是基于LockSupport的状态,类似二元信号量(0和1)实现的。
    解锁unpark的重复性:
    unpark可重复是指在解锁的时候可以重复调用unpark;同样因为LockSupport基于二元锁状态重复调用unpark并不会影响到下一次park操作;

    2.2、LockSupport与其他锁的比较

    基于LockSupport提供的核心的功能,即:park阻塞与unpark恢复运行状态;与此功能类似的有Object的wait/notify和ReentrantLock.Condition的await()、signal()等;
    锁的实现机制不同:
    LockSupport面向的是线程,而Object和ReentrantLock.Condition都是典型的依赖一个对象实现锁机制;

    锁的监视器依赖:
    LockSupport不需要依赖监视器,在源码中可以发现LockSupport并没有提供pubic的构造器,它的所有方法都是静态的;在Object和ReentrantLock.Condition都是需要new一个自身对象作为监视器介质;

    粒度:
    粒度上很显然LockSupport粒度更细;

    使用灵活度:
    LockSupport不需要依赖监视器一定程度上降低耦合而且解锁unpark和锁park顺序灵活;

    2.3、LockSupport的实现

    public class LockSupport {
        private LockSupport() {} // Cannot be instantiated.
    
        //设置阻塞当前线程的操作者,一般保存阻塞操作的线程Thread对象
        //parkBlocker对象为线程Thread.class的属性parkBlocker
        private static void setBlocker(Thread t, Object arg) {
            // Even though volatile, hotspot doesn't need a write barrier here.
            UNSAFE.putObject(t, parkBlockerOffset, arg);
        }
    
       //唤醒给定线程;
       //1、若给定线程的许可(permit)不可用,则赋予给定线程许可;
       //2、若给定线程被park阻塞,则唤醒给定线程;
       //3、若给定线程正常运行,则本次unpark会保证下次针对该线程的park不会阻塞该线程
       //4、当线程还未启动,umpark操作不会有任何作用
        public static void unpark(Thread thread) {
            if (thread != null)
                UNSAFE.unpark(thread);
        }
    
        //阻塞当前当前线程,直到获得许可;
        //当许可可用时,park会立即返回;
        //当许可不可用时,当前线程将会阻塞,直到以下事件发生
            //1、其他线程调用unpark唤醒此线程
            //2、其他线程调用Thread#interrupt中断线程
            //3、调用应不可知错误返回
        //park方法不会报告是什么原因导致的调用返回
        //调用者需在返回时自行检查是什么条件导致调用返回
        public static void park(Object blocker) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, 0L);
            setBlocker(t, null);
        }
    
       //功能与park基本相同,此方法添加阻塞时间参数
        public static void parkNanos(Object blocker, long nanos) {
            if (nanos > 0) {
                Thread t = Thread.currentThread();
                setBlocker(t, blocker);
                UNSAFE.park(false, nanos);
                setBlocker(t, null);
            }
        }
        //功能与park基本相同,此方法添加阻塞到某个时间点的参数
        public static void parkUntil(Object blocker, long deadline) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(true, deadline);
            setBlocker(t, null);
        }
    
        //返回给定线程的parkBlocker参数
        public static Object getBlocker(Thread t) {
            if (t == null)
                throw new NullPointerException();
            return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
        }
    
        
        //
        public static void park() {
            UNSAFE.park(false, 0L);
        }
    
       
        public static void parkNanos(long nanos) {
            if (nanos > 0)
                UNSAFE.park(false, nanos);
        }
    
        
        public static void parkUntil(long deadline) {
            UNSAFE.park(true, deadline);
        }
    
        
    
        //unsafe对象
        private static final sun.misc.Unsafe UNSAFE;
        //保存parkBlocker属性在Thread中的偏移量
        private static final long parkBlockerOffset;
        private static final long SEED;
        private static final long PROBE;
        private static final long SECONDARY;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> tk = Thread.class;
                parkBlockerOffset = UNSAFE.objectFieldOffset
                    (tk.getDeclaredField("parkBlocker"));
                SEED = UNSAFE.objectFieldOffset
                    (tk.getDeclaredField("threadLocalRandomSeed"));
                PROBE = UNSAFE.objectFieldOffset
                    (tk.getDeclaredField("threadLocalRandomProbe"));
                SECONDARY = UNSAFE.objectFieldOffset
                    (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
            } catch (Exception ex) { throw new Error(ex); }
        }
    
    }
    

    2.5、LockSupport底层实现

    LockSupport的Park及Unpark是通过Unsafe的对应类实现的,而Unsafe的相关实现在jvm中,以下是jvm中Unsafe.Park()及Unsafe.Unpark()的cpp实现。

    UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) {
      HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time);
      EventThreadPark event;
    
      JavaThreadParkedState jtps(thread, time != 0);
      thread->parker()->park(isAbsolute != 0, time);
      if (event.should_commit()) {
        const oop obj = thread->current_park_blocker();
        if (time == 0) {
          post_thread_park_event(&event, obj, min_jlong, min_jlong);
        } else {
          if (isAbsolute != 0) {
            post_thread_park_event(&event, obj, min_jlong, time);
          } else {
            post_thread_park_event(&event, obj, time, min_jlong);
          }
        }
      }
      HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());
    } UNSAFE_END
    
    UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread)) {
      Parker* p = NULL;
    
      if (jthread != NULL) {
        ThreadsListHandle tlh;
        JavaThread* thr = NULL;
        oop java_thread = NULL;
        (void) tlh.cv_internal_thread_to_JavaThread(jthread, &thr, &java_thread);
        if (java_thread != NULL) {
          // This is a valid oop.
          if (thr != NULL) {
            // The JavaThread is alive.
            p = thr->parker();
          }
        }
      } // ThreadsListHandle is destroyed here.
    
      // 'p' points to type-stable-memory if non-NULL. If the target
      // thread terminates before we get here the new user of this
      // Parker will get a 'spurious' unpark - which is perfectly valid.
      if (p != NULL) {
        HOTSPOT_THREAD_UNPARK((uintptr_t) p);
        p->unpark();
      }
    } UNSAFE_END
    

    而Unsafe中的相关实现是通过os_posix.cpp中的Parker::park()及Parker::unpark()实现的。

    void Parker::park(bool isAbsolute, jlong time) {
    
      // Optional fast-path check:
      // Return immediately if a permit is available.
      // We depend on Atomic::xchg() having full barrier semantics
      // since we are doing a lock-free update to _counter.
      if (Atomic::xchg(0, &_counter) > 0) return;
    
      Thread* thread = Thread::current();
      assert(thread->is_Java_thread(), "Must be JavaThread");
      JavaThread *jt = (JavaThread *)thread;
    
      // Optional optimization -- avoid state transitions if there's
      // an interrupt pending.
      if (Thread::is_interrupted(thread, false)) {
        return;
      }
    
      // Next, demultiplex/decode time arguments
      struct timespec absTime;
      if (time < 0 || (isAbsolute && time == 0)) { // don't wait at all
        return;
      }
      if (time > 0) {
        to_abstime(&absTime, time, isAbsolute, false);
      }
    
      // Enter safepoint region
      // Beware of deadlocks such as 6317397.
      // The per-thread Parker:: mutex is a classic leaf-lock.
      // In particular a thread must never block on the Threads_lock while
      // holding the Parker:: mutex.  If safepoints are pending both the
      // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
      ThreadBlockInVM tbivm(jt);
    
      // Don't wait if cannot get lock since interference arises from
      // unparking. Also re-check interrupt before trying wait.
      if (Thread::is_interrupted(thread, false) ||
          pthread_mutex_trylock(_mutex) != 0) {
        return;
      }
    
      int status;
      if (_counter > 0)  { // no wait needed
        _counter = 0;
        status = pthread_mutex_unlock(_mutex);
        assert_status(status == 0, status, "invariant");
        // Paranoia to ensure our locked and lock-free paths interact
        // correctly with each other and Java-level accesses.
        OrderAccess::fence();
        return;
      }
    
      OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
      jt->set_suspend_equivalent();
      // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
    
      assert(_cur_index == -1, "invariant");
      if (time == 0) {
        _cur_index = REL_INDEX; // arbitrary choice when not timed
        status = pthread_cond_wait(&_cond[_cur_index], _mutex);
        assert_status(status == 0, status, "cond_timedwait");
      }
      else {
        _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
        status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
        assert_status(status == 0 || status == ETIMEDOUT,
                      status, "cond_timedwait");
      }
      _cur_index = -1;
    
      _counter = 0;
      status = pthread_mutex_unlock(_mutex);
      assert_status(status == 0, status, "invariant");
      // Paranoia to ensure our locked and lock-free paths interact
      // correctly with each other and Java-level accesses.
      OrderAccess::fence();
    
      // If externally suspended while waiting, re-suspend
      if (jt->handle_special_suspend_equivalent_condition()) {
        jt->java_suspend_self();
      }
    }
    
    void Parker::unpark() {
      int status = pthread_mutex_lock(_mutex);
      assert_status(status == 0, status, "invariant");
      const int s = _counter;
      _counter = 1;
      // must capture correct index before unlocking
      int index = _cur_index;
      status = pthread_mutex_unlock(_mutex);
      assert_status(status == 0, status, "invariant");
    
      // Note that we signal() *after* dropping the lock for "immortal" Events.
      // This is safe and avoids a common class of futile wakeups.  In rare
      // circumstances this can cause a thread to return prematurely from
      // cond_{timed}wait() but the spurious wakeup is benign and the victim
      // will simply re-test the condition and re-park itself.
      // This provides particular benefit if the underlying platform does not
      // provide wait morphing.
    
      if (s < 1 && index != -1) {
        // thread is definitely parked
        status = pthread_cond_signal(&_cond[index]);
        assert_status(status == 0, status, "invariant");
      }
    }
    

    由以上源码可知,实现park及unpark的关键是Mutex互斥体、Condition信号量、_counter计数器。
    park主要流程:
    当_counter > 0,则直接调用pthread_mutex_unlock解锁并返回;
    当超时时间time>0,则调用pthread_cond_timedwait进行超时等待,直到超时时间到达;
    当超时时间time=0,则调用pthread_cond_wait等待;
    当wait返回时设置_counter = 0,并调用pthread_mutex_unlock解锁;
    unpark主要流程:
    调用pthread_mutex_lock获取锁,设置_counter=1,调用pthread_mutex_unlock解锁;
    若_counter的原值等于0,则调用pthread_cond_signal进行通知处理;

    相关文章

      网友评论

        本文标题:java锁(3)Lock接口及LockSupport工具详解

        本文链接:https://www.haomeiwen.com/subject/grcdlctx.html