LockSupport APIs
1. unpark(Thread thread)
- Create a new thread to unpark thread t after seconds passed.
/**
* Create a new thread to unpark thread t after seconds passed
* @param t
* @param seconds
*/
private static void unpark(Thread t, long seconds) {
new Thread(() -> {
try {
Thread.sleep(seconds * _SEC_MS);
} catch (InterruptedException e) {
e.printStackTrace();
}
LockSupport.unpark(t);
}).start();
}
2. park() vs park(Object blocker)
- t1 is blocked by LockSupport.park().
- t2 is blocked by LockSupport.park(blocker).
- unpark t1 and t2 after 60 seconds.
private static void parkVsParkBlocker() {
Thread t1 = new Thread(() -> {
LockSupport.park();
}, "t1");
t1.start();
Object blocker = new Object();
Thread t2 = new Thread(() -> {
LockSupport.park(blocker);
}, "t2");
t2.start();
LockSupport.getBlocker(t2);
unpark(t1, 60);
unpark(t2, 60);
}
Print java stack trace of a given jvm process.
jstack `jps -l | grep LockSupport | awk '{print $1}'`
3. getBlocker(Thread t)
LockSupport.getBlocker(t2);
4. parkNanos(long nanos) vs parkUntil(long deadline)
- Park thread t1, t2, t3, t4 for 5 seconds, unpark t2, t4 after 2 seconds.
- Thread t1, t2 parkNanos(long nanos).
Thread t1 = park(5, 5, ParkMethod.PARK_NANOS);
Thread t2 = park(5, 2, ParkMethod.PARK_NANOS);
unpark(t2, 2);
- Thread t3, t4 parkUntil(long deadline).
Thread t3 = park(5, 5, ParkMethod.PARK_UNTIL);
Thread t4 = park(5, 2, ParkMethod.PARK_UNTIL);
unpark(t4, 2);
- Thread t1, t3 are blocked for 5 seconds, since no other thread unpark t1, t3.
- Thread t2, t4 are blocked for 2 seconds, since other thread unpark t2, t4 after 2 seconds.
private static long _SEC_NS = 1000000000;
private static long _SEC_MS = 1000;
enum ParkMethod {
PARK_NANOS, PARK_UNTIL;
}
private static Thread park(long parkSeconds, long actualParkSeconds, ParkMethod parkMethod) {
Thread t = new Thread(() -> {
long start = System.currentTimeMillis();
switch (parkMethod) {
case PARK_NANOS:
LockSupport.parkNanos(parkSeconds * _SEC_NS);
break;
case PARK_UNTIL:
LockSupport.parkUntil(start + parkSeconds * _SEC_MS);
break;
default:
break;
}
assert (System.currentTimeMillis() - start) / _SEC_MS == actualParkSeconds;
});
t.start();
return t;
}
5. parkNanos(Object blocker, long nanos) vs parkUntil(Object blocker, long deadline)
- Thread t1, t2 are blocked for object blocker for 60 seconds
private static void parkNanosVsParkUtilBlocker() {
Object blocker = new Object();
Thread t1 = new Thread(() -> {
LockSupport.parkNanos(blocker, 60 * _SEC_NS);
});
Thread t2 = new Thread(() -> {
LockSupport.parkUntil(blocker, System.currentTimeMillis() + 60 * _SEC_MS);
});
t1.start();
t2.start();
}
Source code
LockSupport
Delegation of method park() and unpark() in Unsafe.
private static final Unsafe U = Unsafe.getUnsafe();
public static void park() {
U.park(false, 0L);
}
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
}
Unsafe
Call native method.
public native void park(boolean isAbsolute, long time);
public native void unpark(Object thread);
park
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) {
HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time);
EventThreadPark event;
JavaThreadParkedState jtps(thread, time != 0);
thread->parker()->park(isAbsolute != 0, time);
if (event.should_commit()) {
const oop obj = thread->current_park_blocker();
if (time == 0) {
post_thread_park_event(&event, obj, min_jlong, min_jlong);
} else {
if (isAbsolute != 0) {
post_thread_park_event(&event, obj, min_jlong, time);
} else {
post_thread_park_event(&event, obj, time, min_jlong);
}
}
}
HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());
} UNSAFE_END
展开宏定义
extern "C" {
static void JNICALL Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time) {
//UNSAFE_ENTRY JVM_ENTRY begin
JavaThread *thread = JavaThread::thread_from_jni_environment(env);
ThreadInVMfromNative __tiv(thread);
debug_only(VMNativeEntryWrapper __vew;)
VM_ENTRY_BASE(result_type, header, thread)
//UNSAFE_ENTRY JVM_ENTRY end
{
//HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time);
EventThreadPark event;
JavaThreadParkedState jtps(thread, time != 0);
thread->parker()->park(isAbsolute != 0, time);
if (event.should_commit()) {
const oop obj = thread->current_park_blocker();
if (time == 0) {
post_thread_park_event(&event, obj, min_jlong, min_jlong);
} else {
if (isAbsolute != 0) {
post_thread_park_event(&event, obj, min_jlong, time);
} else {
post_thread_park_event(&event, obj, time, min_jlong);
}
}
}
//HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());
}
}
}
- Get thread object from jni environment
JavaThread *thread = JavaThread::thread_from_jni_environment(env);
- Create JavaThreadParkedState object
JavaThreadParkedState jtps(thread, time != 0);
- save thread's old state
- set thread's state to PARKED or PARKED_TIMED
- Call park() on thread's _parker object
thread->parker()->park(isAbsolute != 0, time);
unpark
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread)) {
Parker* p = NULL;
if (jthread != NULL) {
ThreadsListHandle tlh;
JavaThread* thr = NULL;
oop java_thread = NULL;
(void) tlh.cv_internal_thread_to_JavaThread(jthread, &thr, &java_thread);
if (java_thread != NULL) {
// This is a valid oop.
jlong lp = java_lang_Thread::park_event(java_thread);
if (lp != 0) {
// This cast is OK even though the jlong might have been read
// non-atomically on 32bit systems, since there, one word will
// always be zero anyway and the value set is always the same
p = (Parker*)addr_from_java(lp);
} else {
// Not cached in the java.lang.Thread oop yet (could be an
// older version of library).
if (thr != NULL) {
// The JavaThread is alive.
p = thr->parker();
if (p != NULL) {
// Cache the Parker in the java.lang.Thread oop for next time.
java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
}
}
}
}
} // ThreadsListHandle is destroyed here.
if (p != NULL) {
HOTSPOT_THREAD_UNPARK((uintptr_t) p);
p->unpark();
}
} UNSAFE_END
展开宏定义
extern "C" {
static void JNICALL Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread) {
//UNSAFE_ENTRY JVM_ENTRY begin
JavaThread *thread = JavaThread::thread_from_jni_environment(env);
ThreadInVMfromNative __tiv(thread);
debug_only(VMNativeEntryWrapper __vew;)
VM_ENTRY_BASE(result_type, header, thread)
//UNSAFE_ENTRY JVM_ENTRY end
{
Parker *p = NULL;
if (jthread != NULL) {
ThreadsListHandle tlh;
JavaThread *thr = NULL;
oop java_thread = NULL;
(void)tlh.cv_internal_thread_to_JavaThread(jthread, &thr, &java_thread);
if (java_thread != NULL) {
// This is a valid oop.
jlong lp = java_lang_Thread::park_event(java_thread);
if (lp != 0) {
// This cast is OK even though the jlong might have been read
// non-atomically on 32bit systems, since there, one word will
// always be zero anyway and the value set is always the same
p = (Parker *)addr_from_java(lp);
} else {
// Not cached in the java.lang.Thread oop yet (could be an
// older version of library).
if (thr != NULL) {
// The JavaThread is alive.
p = thr->parker();
if (p != NULL) {
// Cache the Parker in the java.lang.Thread oop for next time.
java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
}
}
}
}
} // ThreadsListHandle is destroyed here.
if (p != NULL) {
HOTSPOT_THREAD_UNPARK((uintptr_t)p);
p->unpark();
}
}
}
}
- Get thread object from jni environment
JavaThread *thread = JavaThread::thread_from_jni_environment(env);
- Get Parker object from park_event or JavaThread
Parker *p = (Parker *)addr_from_java(lp);
Parker *p = thr->parker();
- Call unpark() on Parker object
p->unpark();
Thread
// JSR166 per-thread parker
private:
Parker* _parker;
public:
Parker* parker() { return _parker; }
void JavaThread::initialize() {
...
_parker = Parker::Allocate(this);
}
- per-thread Parker object
- _parker is initialized by JavaThread::initialize()
Parker
class Parker : public os::PlatformParker {
private:
volatile int _counter ;
Parker * FreeNext ;
JavaThread * AssociatedWith ; // Current association
public:
Parker() : PlatformParker() {
_counter = 0 ;
FreeNext = NULL ;
AssociatedWith = NULL ;
}
protected:
~Parker() { ShouldNotReachHere(); }
public:
// For simplicity of interface with Java, all forms of park (indefinite,
// relative, and absolute) are multiplexed into one call.
void park(bool isAbsolute, jlong time);
void unpark();
// Lifecycle operators
static Parker * Allocate (JavaThread * t) ;
static void Release (Parker * e) ;
private:
static Parker * volatile FreeList ;
static volatile int ListLock ;
};
- Parker extends os::PlatformParker
- Default value of volatile _counter is 0
PlatformParker
// JSR166 support
os::PlatformParker::PlatformParker() {
int status;
status = pthread_cond_init(&_cond[REL_INDEX], _condAttr);
assert_status(status == 0, status, "cond_init rel");
status = pthread_cond_init(&_cond[ABS_INDEX], NULL);
assert_status(status == 0, status, "cond_init abs");
status = pthread_mutex_init(_mutex, _mutexAttr);
assert_status(status == 0, status, "mutex_init");
_cur_index = -1; // mark as unused
}
- Initialize condition variable _cond[0], used when isAbsolute is false
pthread_cond_init(&_cond[REL_INDEX], _condAttr);
- Initialize condition variable _cond[1], used when isAbsolute is true
pthread_cond_init(&_cond[ABS_INDEX], NULL);
- Initialize a mutex _mutex
pthread_mutex_init(_mutex, _mutexAttr);
park
// Parker::park decrements count if > 0, else does a condvar wait. Unpark
// sets count to 1 and signals condvar. Only one thread ever waits
// on the condvar. Contention seen when trying to park implies that someone
// is unparking you, so don't wait. And spurious returns are fine, so there
// is no need to track notifications.
void Parker::park(bool isAbsolute, jlong time) {
// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
if (Atomic::xchg(0, &_counter) > 0) return;
Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;
// Optional optimization -- avoid state transitions if there's
// an interrupt pending.
if (Thread::is_interrupted(thread, false)) {
return;
}
// Next, demultiplex/decode time arguments
struct timespec absTime;
if (time < 0 || (isAbsolute && time == 0)) { // don't wait at all
return;
}
if (time > 0) {
to_abstime(&absTime, time, isAbsolute);
}
// Enter safepoint region
// Beware of deadlocks such as 6317397.
// The per-thread Parker:: mutex is a classic leaf-lock.
// In particular a thread must never block on the Threads_lock while
// holding the Parker:: mutex. If safepoints are pending both the
// the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
ThreadBlockInVM tbivm(jt);
// Don't wait if cannot get lock since interference arises from
// unparking. Also re-check interrupt before trying wait.
if (Thread::is_interrupted(thread, false) ||
pthread_mutex_trylock(_mutex) != 0) {
return;
}
int status;
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "invariant");
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
return;
}
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
assert(_cur_index == -1, "invariant");
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
status = pthread_cond_wait(&_cond[_cur_index], _mutex);
assert_status(status == 0, status, "cond_timedwait");
}
else {
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
assert_status(status == 0 || status == ETIMEDOUT,
status, "cond_timedwait");
}
_cur_index = -1;
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "invariant");
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
// If externally suspended while waiting, re-suspend
if (jt->handle_special_suspend_equivalent_condition()) {
jt->java_suspend_self();
}
}
-
if (Atomic::xchg(0, &_counter) > 0) return;
Performs atomic exchange of _counter to value 0, return previous value of _counter.- If previous value of _counter > 0, stop park.
This happens in case we call unpark before park.
LockSupport.unpark(Thread.currentThread()); LockSupport.park();
- If previous value of _counter = 0, continue park.
- If previous value of _counter > 0, stop park.
-
if (Thread::is_interrupted(thread, false)) return;
Check if current thread is interrupted.- If current thread is interrupted, stop park.
This happens in case we interrupt current thread before park.
Thread.currentThread().interrupt(); LockSupport.park();
- Else continue park.
- If current thread is interrupted, stop park.
-
Demultiplex/decode time arguments
- time < 0, stop park.
- time = 0 and isAbsolute = true, stop park.
- time = 0 and isAbsolute = false, park forever until unpark.
- time > 0, extract absTime.
to_abstime(&absTime, time, isAbsolute);
absTime.tv_sec and absTime.tv_usec
represent seconds and microseconds since the Epoch1970-01-01 00:00:00 +0000 (UTC)
.
-
ThreadBlockInVM tbivm(jt);
Transit java thread status from running (_thread_in_vm
) to blocked(_thread_blocked
). -
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) return;
Re-check if current thread is interrupted, and try to acquire lock on _mutex.- If current thread is interrupted, stop park.
This happens while t2 interrupted t1, and then t1 executed to 5.
Thread t1 = new Thread(() -> LockSupport.park());
Thread t2 = new Thread(() -> t1.interrupt());
t1.start();
t2.start();
- If current thread can't acquire lock on _mutex, stop park.
This happens while t2 acquired lock on _mutex and then t1 executed to 5.
Thread t1 = new Thread(() -> LockSupport.park());
Thread t2 = new Thread(() -> LockSupport.unpark(t1););
t1.start();
t2.start();
- Else acquire lock on _mutex and continue park.
- If current thread is interrupted, stop park.
-
if (_counter > 0) {_counter = 0; pthread_mutex_unlock(_mutex); return;}
- If _counter > 0, stop park.
This happens while t2 finished unpark t1, and t1 executed to 6.
Thread t1 = new Thread(() -> LockSupport.park());
Thread t2 = new Thread(() -> LockSupport.unpark(t1););
t1.start();
t2.start();
- If _counter > 0, stop park.
-
OSThreadWaitState osts(thread->osthread(), false);
Set os thread state to wait. -
pthread_cond_wait and pthread_cond_timedwait
Block current thread- time = 0
pthread_cond_wait(&_cond[REL_INDEX], _mutex);
- time > 0
pthread_cond_timedwait(&_cond[isAbsolute ? ABS_INDEX : REL_INDEX], _mutex, &absTime);
- time = 0
-
_counter = 0; pthread_mutex_unlock(_mutex);
After thread is unblocked by unpark, set _counter = 0 and release _mutex lock.
unpark
void Parker::unpark() {
int status = pthread_mutex_lock(_mutex);
assert_status(status == 0, status, "invariant");
const int s = _counter;
_counter = 1;
// must capture correct index before unlocking
int index = _cur_index;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "invariant");
// Note that we signal() *after* dropping the lock for "immortal" Events.
// This is safe and avoids a common class of futile wakeups. In rare
// circumstances this can cause a thread to return prematurely from
// cond_{timed}wait() but the spurious wakeup is benign and the victim
// will simply re-test the condition and re-park itself.
// This provides particular benefit if the underlying platform does not
// provide wait morphing.
if (s < 1 && index != -1) {
// thread is definitely parked
status = pthread_cond_signal(&_cond[index]);
assert_status(status == 0, status, "invariant");
}
}
-
pthread_mutex_lock(_mutex);
Acquire mutex lock on _mutex. -
const int s = _counter; _counter = 1; int index = _cur_index;
- s = previous value of _counter.
- _counter = 1.
- _cur_index to get condition variable for unblocking.
-
pthread_mutex_unlock(_mutex);
Release mutex lock on _mutex. -
if (s < 1 && index != -1) {pthread_cond_signal(&_cond[index]);}
If previous value of _counter < 1 and _cur_index != -1, unblock thread.
Summary
- park set _counter to 0, block thread through pthread_cond_wait/pthread_cond_timedwait.
- unpark set _counter to 1, unblock thread through pthread_cond_signal.
A call to park will return immediately if the permit is available(
_counter = 1
), consuming it in the process(_counter = 0
); otherwise(_counter = 0
) it may block. A call to unpark makes the permit available(_counter = 1
), if it was not already available.
Reference
LockSupport doc
cpp reference
open jdk code search
open jdk github
condition-wait-signal-multi-threading
网友评论