美文网首页
Java LockSupport

Java LockSupport

作者: yunpxu | 来源:发表于2018-12-14 15:28 被阅读0次

    LockSupport APIs

    1. unpark(Thread thread)

    • Create a new thread to unpark thread t after seconds passed.
    /**
     * Create a new thread to unpark thread t after seconds passed
     * @param t
     * @param seconds
     */
    private static void unpark(Thread t, long seconds) {
        new Thread(() -> {
            try {
                Thread.sleep(seconds * _SEC_MS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            LockSupport.unpark(t);
        }).start();
    }
    

    2. park() vs park(Object blocker)

    • t1 is blocked by LockSupport.park().
    • t2 is blocked by LockSupport.park(blocker).
    • unpark t1 and t2 after 60 seconds.
    private static void parkVsParkBlocker() {
        Thread t1 = new Thread(() -> {
            LockSupport.park();
        }, "t1");
        t1.start();
    
        Object blocker = new Object();
        Thread t2 = new Thread(() -> {
            LockSupport.park(blocker);
        }, "t2");
        t2.start();
    
        LockSupport.getBlocker(t2);
    
        unpark(t1, 60);
        unpark(t2, 60);
    }
    

    Print java stack trace of a given jvm process.
    jstack `jps -l | grep LockSupport | awk '{print $1}'`

    park vs park with blocker

    3. getBlocker(Thread t)

    LockSupport.getBlocker(t2);

    4. parkNanos(long nanos) vs parkUntil(long deadline)

    • Park thread t1, t2, t3, t4 for 5 seconds, unpark t2, t4 after 2 seconds.
    • Thread t1, t2 parkNanos(long nanos).
      Thread t1 = park(5, 5, ParkMethod.PARK_NANOS);
      Thread t2 = park(5, 2, ParkMethod.PARK_NANOS);
      unpark(t2, 2);
    • Thread t3, t4 parkUntil(long deadline).
      Thread t3 = park(5, 5, ParkMethod.PARK_UNTIL);
      Thread t4 = park(5, 2, ParkMethod.PARK_UNTIL);
      unpark(t4, 2);
    • Thread t1, t3 are blocked for 5 seconds, since no other thread unpark t1, t3.
    • Thread t2, t4 are blocked for 2 seconds, since other thread unpark t2, t4 after 2 seconds.
    private static long _SEC_NS = 1000000000;
    private static long _SEC_MS = 1000;
    
    enum ParkMethod {
        PARK_NANOS, PARK_UNTIL;
    }
    
    private static Thread park(long parkSeconds, long actualParkSeconds, ParkMethod parkMethod) {
        Thread t = new Thread(() -> {
            long start = System.currentTimeMillis();
            switch (parkMethod) {
                case PARK_NANOS:
                    LockSupport.parkNanos(parkSeconds * _SEC_NS);
                    break;
                case PARK_UNTIL:
                    LockSupport.parkUntil(start + parkSeconds * _SEC_MS);
                    break;
                default:
                    break;
            }
            assert (System.currentTimeMillis() - start) / _SEC_MS == actualParkSeconds;
        });
        t.start();
        return t;
    }
    

    5. parkNanos(Object blocker, long nanos) vs parkUntil(Object blocker, long deadline)

    • Thread t1, t2 are blocked for object blocker for 60 seconds
    private static void parkNanosVsParkUtilBlocker() {
        Object blocker = new Object();
    
        Thread t1 = new Thread(() -> {
            LockSupport.parkNanos(blocker, 60 * _SEC_NS);
        });
        Thread t2 = new Thread(() -> {
            LockSupport.parkUntil(blocker, System.currentTimeMillis() + 60 * _SEC_MS);
        });
        t1.start();
        t2.start();
    }
    

    Source code

    LockSupport

    LockSupport.java

    Delegation of method park() and unpark() in Unsafe.

    private static final Unsafe U = Unsafe.getUnsafe();
    
    public static void park() {
        U.park(false, 0L);
    }
    
    public static void unpark(Thread thread) {
        if (thread != null)
            U.unpark(thread);
    }
    

    Unsafe

    Unsafe.java

    Call native method.

    public native void park(boolean isAbsolute, long time);
    
    public native void unpark(Object thread);
    

    unsafe.cpp

    park

    UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) {
      HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time);
      EventThreadPark event;
    
      JavaThreadParkedState jtps(thread, time != 0);
      thread->parker()->park(isAbsolute != 0, time);
      if (event.should_commit()) {
        const oop obj = thread->current_park_blocker();
        if (time == 0) {
          post_thread_park_event(&event, obj, min_jlong, min_jlong);
        } else {
          if (isAbsolute != 0) {
            post_thread_park_event(&event, obj, min_jlong, time);
          } else {
            post_thread_park_event(&event, obj, time, min_jlong);
          }
        }
      }
      HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());
    } UNSAFE_END
    

    展开宏定义

    extern "C" {
        static void JNICALL Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time) {
            //UNSAFE_ENTRY JVM_ENTRY begin
            JavaThread *thread = JavaThread::thread_from_jni_environment(env);
            ThreadInVMfromNative __tiv(thread);
            debug_only(VMNativeEntryWrapper __vew;)
            VM_ENTRY_BASE(result_type, header, thread)
            //UNSAFE_ENTRY JVM_ENTRY end
    
            {
                //HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time);
                EventThreadPark event;
    
                JavaThreadParkedState jtps(thread, time != 0);
                thread->parker()->park(isAbsolute != 0, time);
                if (event.should_commit()) {
                    const oop obj = thread->current_park_blocker();
                    if (time == 0) {
                        post_thread_park_event(&event, obj, min_jlong, min_jlong);
                    } else {
                        if (isAbsolute != 0) {
                            post_thread_park_event(&event, obj, min_jlong, time);
                        } else {
                            post_thread_park_event(&event, obj, time, min_jlong);
                        }
                    }
                }
                //HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());
            }
        }
    }
    
    1. Get thread object from jni environment
      JavaThread *thread = JavaThread::thread_from_jni_environment(env);
    2. Create JavaThreadParkedState object
      JavaThreadParkedState jtps(thread, time != 0);
      • save thread's old state
      • set thread's state to PARKED or PARKED_TIMED
    3. Call park() on thread's _parker object
      thread->parker()->park(isAbsolute != 0, time);

    unpark

    UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread)) {
      Parker* p = NULL;
    
      if (jthread != NULL) {
        ThreadsListHandle tlh;
        JavaThread* thr = NULL;
        oop java_thread = NULL;
        (void) tlh.cv_internal_thread_to_JavaThread(jthread, &thr, &java_thread);
        if (java_thread != NULL) {
          // This is a valid oop.
          jlong lp = java_lang_Thread::park_event(java_thread);
          if (lp != 0) {
            // This cast is OK even though the jlong might have been read
            // non-atomically on 32bit systems, since there, one word will
            // always be zero anyway and the value set is always the same
            p = (Parker*)addr_from_java(lp);
          } else {
            // Not cached in the java.lang.Thread oop yet (could be an
            // older version of library).
            if (thr != NULL) {
              // The JavaThread is alive.
              p = thr->parker();
              if (p != NULL) {
                // Cache the Parker in the java.lang.Thread oop for next time.
                java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
              }
            }
          }
        }
      } // ThreadsListHandle is destroyed here.
    
      if (p != NULL) {
        HOTSPOT_THREAD_UNPARK((uintptr_t) p);
        p->unpark();
      }
    } UNSAFE_END
    

    展开宏定义

    extern "C" {
        static void JNICALL Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread) {
            //UNSAFE_ENTRY JVM_ENTRY begin
            JavaThread *thread = JavaThread::thread_from_jni_environment(env);
            ThreadInVMfromNative __tiv(thread);
            debug_only(VMNativeEntryWrapper __vew;)
            VM_ENTRY_BASE(result_type, header, thread)
            //UNSAFE_ENTRY JVM_ENTRY end
            {
                Parker *p = NULL;
    
                if (jthread != NULL) {
                    ThreadsListHandle tlh;
                    JavaThread *thr = NULL;
                    oop java_thread = NULL;
                    (void)tlh.cv_internal_thread_to_JavaThread(jthread, &thr, &java_thread);
                    if (java_thread != NULL) {
                        // This is a valid oop.
                        jlong lp = java_lang_Thread::park_event(java_thread);
                        if (lp != 0) {
                            // This cast is OK even though the jlong might have been read
                            // non-atomically on 32bit systems, since there, one word will
                            // always be zero anyway and the value set is always the same
                            p = (Parker *)addr_from_java(lp);
                        } else {
                            // Not cached in the java.lang.Thread oop yet (could be an
                            // older version of library).
                            if (thr != NULL) {
                                // The JavaThread is alive.
                                p = thr->parker();
                                if (p != NULL) {
                                    // Cache the Parker in the java.lang.Thread oop for next time.
                                    java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
                                }
                            }
                        }
                    }
                } // ThreadsListHandle is destroyed here.
    
                if (p != NULL) {
                    HOTSPOT_THREAD_UNPARK((uintptr_t)p);
                    p->unpark();
                }
            }
        }
    }
    
    1. Get thread object from jni environment
      JavaThread *thread = JavaThread::thread_from_jni_environment(env);
    2. Get Parker object from park_event or JavaThread
      Parker *p = (Parker *)addr_from_java(lp);
      Parker *p = thr->parker();
    3. Call unpark() on Parker object
      p->unpark();

    Thread

    thread.hpp

      // JSR166 per-thread parker
     private:
      Parker*    _parker;
     public:
      Parker*     parker() { return _parker; }
    

    thread.cpp

    void JavaThread::initialize() {
      ...
      _parker = Parker::Allocate(this);
    }
    
    1. per-thread Parker object
    2. _parker is initialized by JavaThread::initialize()

    Parker

    park.hpp

    class Parker : public os::PlatformParker {
    private:
      volatile int _counter ;
      Parker * FreeNext ;
      JavaThread * AssociatedWith ; // Current association
    
    public:
      Parker() : PlatformParker() {
        _counter       = 0 ;
        FreeNext       = NULL ;
        AssociatedWith = NULL ;
      }
    protected:
      ~Parker() { ShouldNotReachHere(); }
    public:
      // For simplicity of interface with Java, all forms of park (indefinite,
      // relative, and absolute) are multiplexed into one call.
      void park(bool isAbsolute, jlong time);
      void unpark();
    
      // Lifecycle operators
      static Parker * Allocate (JavaThread * t) ;
      static void Release (Parker * e) ;
    private:
      static Parker * volatile FreeList ;
      static volatile int ListLock ;
    
    };
    
    1. Parker extends os::PlatformParker
    2. Default value of volatile _counter is 0

    PlatformParker

    os_posix.cpp

    // JSR166 support
     os::PlatformParker::PlatformParker() {
      int status;
      status = pthread_cond_init(&_cond[REL_INDEX], _condAttr);
      assert_status(status == 0, status, "cond_init rel");
      status = pthread_cond_init(&_cond[ABS_INDEX], NULL);
      assert_status(status == 0, status, "cond_init abs");
      status = pthread_mutex_init(_mutex, _mutexAttr);
      assert_status(status == 0, status, "mutex_init");
      _cur_index = -1; // mark as unused
    }
    
    1. Initialize condition variable _cond[0], used when isAbsolute is false
      pthread_cond_init(&_cond[REL_INDEX], _condAttr);
    2. Initialize condition variable _cond[1], used when isAbsolute is true
      pthread_cond_init(&_cond[ABS_INDEX], NULL);
    3. Initialize a mutex _mutex
      pthread_mutex_init(_mutex, _mutexAttr);

    park

    // Parker::park decrements count if > 0, else does a condvar wait.  Unpark
    // sets count to 1 and signals condvar.  Only one thread ever waits
    // on the condvar. Contention seen when trying to park implies that someone
    // is unparking you, so don't wait. And spurious returns are fine, so there
    // is no need to track notifications.
    
    void Parker::park(bool isAbsolute, jlong time) {
    
      // Optional fast-path check:
      // Return immediately if a permit is available.
      // We depend on Atomic::xchg() having full barrier semantics
      // since we are doing a lock-free update to _counter.
      if (Atomic::xchg(0, &_counter) > 0) return;
    
      Thread* thread = Thread::current();
      assert(thread->is_Java_thread(), "Must be JavaThread");
      JavaThread *jt = (JavaThread *)thread;
    
      // Optional optimization -- avoid state transitions if there's
      // an interrupt pending.
      if (Thread::is_interrupted(thread, false)) {
        return;
      }
    
      // Next, demultiplex/decode time arguments
      struct timespec absTime;
      if (time < 0 || (isAbsolute && time == 0)) { // don't wait at all
        return;
      }
      if (time > 0) {
        to_abstime(&absTime, time, isAbsolute);
      }
    
      // Enter safepoint region
      // Beware of deadlocks such as 6317397.
      // The per-thread Parker:: mutex is a classic leaf-lock.
      // In particular a thread must never block on the Threads_lock while
      // holding the Parker:: mutex.  If safepoints are pending both the
      // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
      ThreadBlockInVM tbivm(jt);
    
      // Don't wait if cannot get lock since interference arises from
      // unparking. Also re-check interrupt before trying wait.
      if (Thread::is_interrupted(thread, false) ||
          pthread_mutex_trylock(_mutex) != 0) {
        return;
      }
    
      int status;
      if (_counter > 0)  { // no wait needed
        _counter = 0;
        status = pthread_mutex_unlock(_mutex);
        assert_status(status == 0, status, "invariant");
        // Paranoia to ensure our locked and lock-free paths interact
        // correctly with each other and Java-level accesses.
        OrderAccess::fence();
        return;
      }
    
      OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
      jt->set_suspend_equivalent();
      // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
    
      assert(_cur_index == -1, "invariant");
      if (time == 0) {
        _cur_index = REL_INDEX; // arbitrary choice when not timed
        status = pthread_cond_wait(&_cond[_cur_index], _mutex);
        assert_status(status == 0, status, "cond_timedwait");
      }
      else {
        _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
        status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
        assert_status(status == 0 || status == ETIMEDOUT,
                      status, "cond_timedwait");
      }
      _cur_index = -1;
    
      _counter = 0;
      status = pthread_mutex_unlock(_mutex);
      assert_status(status == 0, status, "invariant");
      // Paranoia to ensure our locked and lock-free paths interact
      // correctly with each other and Java-level accesses.
      OrderAccess::fence();
    
      // If externally suspended while waiting, re-suspend
      if (jt->handle_special_suspend_equivalent_condition()) {
        jt->java_suspend_self();
      }
    }
    
    1. if (Atomic::xchg(0, &_counter) > 0) return;
      Performs atomic exchange of _counter to value 0, return previous value of _counter.

      • If previous value of _counter > 0, stop park.
        This happens in case we call unpark before park.
        LockSupport.unpark(Thread.currentThread()); LockSupport.park();
      • If previous value of _counter = 0, continue park.
    2. if (Thread::is_interrupted(thread, false)) return;
      Check if current thread is interrupted.

      • If current thread is interrupted, stop park.
        This happens in case we interrupt current thread before park.
        Thread.currentThread().interrupt(); LockSupport.park();
      • Else continue park.
    3. Demultiplex/decode time arguments

      • time < 0, stop park.
      • time = 0 and isAbsolute = true, stop park.
      • time = 0 and isAbsolute = false, park forever until unpark.
      • time > 0, extract absTime.
        to_abstime(&absTime, time, isAbsolute);
        absTime.tv_sec and absTime.tv_usec represent seconds and microseconds since the Epoch1970-01-01 00:00:00 +0000 (UTC).
    4. ThreadBlockInVM tbivm(jt);
      Transit java thread status from running (_thread_in_vm) to blocked(_thread_blocked).

    5. if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) return;
      Re-check if current thread is interrupted, and try to acquire lock on _mutex.

      • If current thread is interrupted, stop park.
        This happens while t2 interrupted t1, and then t1 executed to 5.
        Thread t1 = new Thread(() -> LockSupport.park());
        Thread t2 = new Thread(() -> t1.interrupt());
        t1.start();
        t2.start();
      • If current thread can't acquire lock on _mutex, stop park.
        This happens while t2 acquired lock on _mutex and then t1 executed to 5.
        Thread t1 = new Thread(() -> LockSupport.park());
        Thread t2 = new Thread(() -> LockSupport.unpark(t1););
        t1.start();
        t2.start();
      • Else acquire lock on _mutex and continue park.
    6. if (_counter > 0) {_counter = 0; pthread_mutex_unlock(_mutex); return;}

      • If _counter > 0, stop park.
        This happens while t2 finished unpark t1, and t1 executed to 6.
        Thread t1 = new Thread(() -> LockSupport.park());
        Thread t2 = new Thread(() -> LockSupport.unpark(t1););
        t1.start();
        t2.start();
    7. OSThreadWaitState osts(thread->osthread(), false);
      Set os thread state to wait.

    8. pthread_cond_wait and pthread_cond_timedwait
      Block current thread

      • time = 0
        pthread_cond_wait(&_cond[REL_INDEX], _mutex);
      • time > 0
        pthread_cond_timedwait(&_cond[isAbsolute ? ABS_INDEX : REL_INDEX], _mutex, &absTime);
    9. _counter = 0; pthread_mutex_unlock(_mutex);
      After thread is unblocked by unpark, set _counter = 0 and release _mutex lock.

    unpark

    void Parker::unpark() {
      int status = pthread_mutex_lock(_mutex);
      assert_status(status == 0, status, "invariant");
      const int s = _counter;
      _counter = 1;
      // must capture correct index before unlocking
      int index = _cur_index;
      status = pthread_mutex_unlock(_mutex);
      assert_status(status == 0, status, "invariant");
    
      // Note that we signal() *after* dropping the lock for "immortal" Events.
      // This is safe and avoids a common class of futile wakeups.  In rare
      // circumstances this can cause a thread to return prematurely from
      // cond_{timed}wait() but the spurious wakeup is benign and the victim
      // will simply re-test the condition and re-park itself.
      // This provides particular benefit if the underlying platform does not
      // provide wait morphing.
    
      if (s < 1 && index != -1) {
        // thread is definitely parked
        status = pthread_cond_signal(&_cond[index]);
        assert_status(status == 0, status, "invariant");
      }
    }
    
    1. pthread_mutex_lock(_mutex);
      Acquire mutex lock on _mutex.
    2. const int s = _counter; _counter = 1; int index = _cur_index;
      • s = previous value of _counter.
      • _counter = 1.
      • _cur_index to get condition variable for unblocking.
    3. pthread_mutex_unlock(_mutex);
      Release mutex lock on _mutex.
    4. if (s < 1 && index != -1) {pthread_cond_signal(&_cond[index]);}
      If previous value of _counter < 1 and _cur_index != -1, unblock thread.

    Summary

    • park set _counter to 0, block thread through pthread_cond_wait/pthread_cond_timedwait.
    • unpark set _counter to 1, unblock thread through pthread_cond_signal.

    A call to park will return immediately if the permit is available(_counter = 1), consuming it in the process(_counter = 0); otherwise(_counter = 0) it may block. A call to unpark makes the permit available(_counter = 1), if it was not already available.

    Reference

    LockSupport doc
    cpp reference
    open jdk code search
    open jdk github
    condition-wait-signal-multi-threading

    相关文章

      网友评论

          本文标题:Java LockSupport

          本文链接:https://www.haomeiwen.com/subject/agsihqtx.html