我们知道Java语言的Object类中提供了wait、notify和notifyAll方法来实现等待/通知机制。
通过调用wait挂起线程,通过notify或notifyAll来唤醒线程。
wait、notify和notifyAll方法都要求调用之前必须获取对象关联的监视器的使用权,也就是说这三个方法只能是在synchronized块或者synchonized方法中被调用,而两个线程之间可能并不是互斥的,也就是说它们不会互相抢夺资源,只是一个协作的关系,没有同步的必要。
wait、notify和notifyAll方法的调用的主体是第三方对象,并不直接在代码中操作线程。而我们想做的其实就是直接操作线程,比如说有一个方法以线程对象为参数让它进入WAITTING状态。
上面说了我们操作的不便和不必须,那么有没有机制来解决呢?
有的,JDK1.5提供了java.util.concurrent.locks.LockSupport类,它可以解决我们上面所说的问题。
public static void park() ;
public static void park(Object blocker);
public static void parkUntil(long deadline) ;
public static void parkUntil(Object blocker, long deadline) ;
public static void parkNanos(long nanos)
public static void parkNanos(Object blocker, long nanos);
public static void unpark(Thread thread)
上面是LockSupport为我们提供了关于等待/唤醒操作的方法。
与Object类的wait/notify机制相比,park/unpark有两个优点:
- 以thread为操作对象更符合阻塞线程的直观定义;
- 操作更精准,可以准确地唤醒某一个线程(notify随机唤醒一个线程,notifyAll唤醒所有等待的线程),增加了灵活性。
LockSupport等待和唤醒线程的实现原理与Object类提供的等待和唤醒机制是不同的。
LockSupport调度不需要监视器的使用权,它是通过许可的检查来决定怎样操作线程。
现在先来使用LockSupport来模拟买书的情景。
import java.util.concurrent.locks.LockSupport;
public class BookTradeLockSupport {
public static void main(String[] args) throws InterruptedException {
// 读者
Thread threadB = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "在等着买书...");
LockSupport.park();
System.out.println(Thread.currentThread().getName() + "我买到书了...");
}
}, "读者");
// 作者
Thread threadA = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "在写书...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "写好书了...");
LockSupport.unpark(threadB);
}
}, "作者");
threadA.start();
threadB.start();
}
}
从上面可以看到,没有了同步代码块,而且threadA和threadB的启动顺序也不影响结果。
API说明
park
API中对park方法的说明如下:
Disables the current thread for thread scheduling purposes unless the permit is available.
除非"许可证"是可用的,不然的话就使线程被调度的能力。
拥有"许可证"那么就能被线程调度器调度,反之,挂起线程。
现在我们深入源码来看一下park方法的实现原理。
LockSupport.java
public static void park() {
UNSAFE.park(false, 0L);
}
这里调用了UNSAFE类的park方法,这个类的park方法如下:
Unsafe.java
/**
* Block current thread, returning when a balancing
* <tt>unpark</tt> occurs, or a balancing <tt>unpark</tt> has
* already occurred, or the thread is interrupted, or, if not
* absolute and time is not zero, the given time nanoseconds have
* elapsed, or if absolute, the given deadline in milliseconds
* since Epoch has passed, or spuriously (i.e., returning for no
* "reason"). Note: This operation is in the Unsafe class only
* because <tt>unpark</tt> is, so it would be strange to place it
* elsewhere.
*/
public native void park(boolean isAbsolute, long time);
这是个本地方法,继续查看:
unsafe.cpp
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
UnsafeWrapper("Unsafe_Park");
EventThreadPark event;
#ifndef USDT2
HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
#else /* USDT2 */
HOTSPOT_THREAD_PARK_BEGIN(
(uintptr_t) thread->parker(), (int) isAbsolute, time);
#endif /* USDT2 */
JavaThreadParkedState jtps(thread, time != 0);
thread->parker()->park(isAbsolute != 0, time);
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
#else /* USDT2 */
HOTSPOT_THREAD_PARK_END(
(uintptr_t) thread->parker());
#endif /* USDT2 */
if (event.should_commit()) {
oop obj = thread->current_park_blocker();
event.set_klass((obj != NULL) ? obj->klass() : NULL);
event.set_timeout(time);
event.set_address((obj != NULL) ? (TYPE_ADDRESS) cast_from_oop<uintptr_t>(obj) : 0);
event.commit();
}
UNSAFE_END
关键的是这句:
thread->parker()->park(isAbsolute != 0, time);
它调用了线程的Parker对象的park方法。
在HotSpot中,每个java线程都有一个Parker的实例。
如下是Parker类的定义:
Parker.hpp
class Parker : public os::PlatformParker {
private:
volatile int _counter ;
Parker * FreeNext ;
JavaThread * AssociatedWith ; // Current association
public:
Parker() : PlatformParker() {
_counter = 0 ;
FreeNext = NULL ;
AssociatedWith = NULL ;
}
可以看到定义了_counter属性,并且初始值为0。
Parker类的park方法在不同平台有不同的实现,现在以Linux为例说明:
os_linux.cpp
void Parker::park(bool isAbsolute, jlong time) {
// Ideally we'd do something useful while spinning, such
// as calling unpackTime().
// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
if (Atomic::xchg(0, &_counter) > 0) return;
Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;
// Optional optimization -- avoid state transitions if there's an interrupt pending.
// Check interrupt before trying to wait
if (Thread::is_interrupted(thread, false)) {
return;
}
// Next, demultiplex/decode time arguments
timespec absTime;
if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
return;
}
if (time > 0) {
unpackTime(&absTime, isAbsolute, time);
}
// Enter safepoint region
// Beware of deadlocks such as 6317397.
// The per-thread Parker:: mutex is a classic leaf-lock.
// In particular a thread must never block on the Threads_lock while
// holding the Parker:: mutex. If safepoints are pending both the
// the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
ThreadBlockInVM tbivm(jt);
// Don't wait if cannot get lock since interference arises from
// unblocking. Also. check interrupt before trying wait
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
return;
}
int status ;
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
return;
}
#ifdef ASSERT
// Don't catch signals while blocked; let the running threads have the signals.
// (This allows a debugger to break into the running thread.)
sigset_t oldsigs;
sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
assert(_cur_index == -1, "invariant");
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
} else {
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
if (status != 0 && WorkAroundNPTLTimedWaitHang) {
pthread_cond_destroy (&_cond[_cur_index]) ;
pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
}
}
_cur_index = -1;
assert_status(status == 0 || status == EINTR ||
status == ETIME || status == ETIMEDOUT,
status, "cond_timedwait");
#ifdef ASSERT
pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif
_counter = 0 ;
status = pthread_mutex_unlock(_mutex) ;
assert_status(status == 0, status, "invariant") ;
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
// If externally suspended while waiting, re-suspend
if (jt->handle_special_suspend_equivalent_condition()) {
jt->java_suspend_self();
}
}
看下面的一段代码:
if (Atomic::xchg(0, &_counter) > 0) return;
正如注释上描述的那样,这是一个可选的快速检查动作。检查什么呢?看下Atomic原子操作类的xchg方法的作用是:
atomic.cpp
// Performs atomic exchange of *dest with exchange_value. Returns old prior value of *dest.
inline static jint xchg(jint exchange_value, volatile jint* dest);
static unsigned int xchg(unsigned int exchange_value, volatile unsigned int* dest);
原子性的将dest的值设置为exchange_value的值,然后返回dest前的值。
我们上面调用中dest是_counter属性,那么这个调用的作用就是原子性地将_counter的值设置为0,然后返回_counter设置前的值,如果设置前的值大于0那么就立即返回。
也就是说如果_counter已经被其他线程修改为非0了,那么调用park方法就不起作用,当前线程不会挂起,而是直接返回继续执行逻辑,并且将当前线程的Parker对象的_counter属性置为0。
从这里可以看出这个_counter属性就是我们上面所说的”许可证“的体现,初始情况下,_counter为0,那么就是未拥有"许可证",调用park方法就会挂起。
继续查看代码,
// Optional optimization -- avoid state transitions if there's an interrupt pending.
// Check interrupt before trying to wait
if (Thread::is_interrupted(thread, false)) {
return;
}
如果当前线程是被打断的,那么也立即返回。
// Don't wait if cannot get lock since interference arises from
// unblocking. Also. check interrupt before trying wait
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
return;
}
这段代码是什么意思,意义何在?
我们来分析一下:
通过上面的讲解,我们应该很清楚了LockSupport.park可能会使得当前线程t(就是执行LockSupport.park这段代码的这个线程)被挂起。
但是在并发的环境下,也会出现其他线程调用LockSupport.unpark(t)对t执行唤醒操作。
一边是要挂起,一个是要唤醒,起冲突了。
也就是说挂起和唤醒操作要进行同步处理(互斥执行)。
那么,LockSupport就必须提供这种支持。
说起同步,这时候我们一定会想起我们上面说的wait/notify机制。
还记得我们在代码中是怎么写wait和notify的吧,是像这样的伪代码:
thread 1:
synchronized(obj) {
obj.wait();
}
thread2:
synchronized(obj) {
obj.notify();
}
这里通过监视器机制就保证了thread1和thread2互斥得执行wait和notify。
LockSupport的实现不采用监视器机制,而是通过互斥锁的方式实现的。
上面的_mutex就是一个互斥锁。
os_linux.cpp
class PlatformParker : public CHeapObj<mtInternal> {
protected:
enum {
REL_INDEX = 0,
ABS_INDEX = 1
};
int _cur_index; // which cond is in use: -1, 0, 1
pthread_mutex_t _mutex [1] ;
pthread_cond_t _cond [2] ; // one for relative times and one for abs.
现在再来看一下pthread_mutex_trylock()方法。
pthread_mutex_trylock()是Linux底层提供的一个方法,方法说明如下:
pthread_mutex_trylock() 是 pthread_mutex_lock() 的非阻塞版本。如果 mutex 所引用的互斥对象当前被任何线程(包括当前线程)锁定,则将立即返回该调用。否则,该互斥锁将处于锁定状态,调用线程是其属主。
pthread_mutex_trylock() 在成功完成之后会返回零。其他任何返回值都表示出现了错误。
好了,到这里,应该很清楚上面那段代码的含义了吧,含义就是:
挂起线程前尝试通过pthread_mutex_trylock方法获取互斥锁(_mutex),如果获取不成功,说明有其他线程获取到了这个互斥锁,正在对我们想挂起的线程执行唤醒操作,那么就放弃执行,立即返回。
现在,来看看挂起具体的逻辑:
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
}
pthread_cond_wait是Linux底层提供了一个方法,API说明如下:
int pthread_cond_wait( pthread_cond_t* cond, pthread_mutex_t* mutex );
Wait on a condition variable.
在一个状态变量上进行等待。参数说明
cond
A pointer to the pthread_cond_t object that you want the threads to block on.mutex
The mutex that you want to unlock.
下面代码是_cond数组的定义:
os_linux.cpp
class PlatformParker : public CHeapObj<mtInternal> {
protected:
enum {
REL_INDEX = 0,
ABS_INDEX = 1
};
int _cur_index; // which cond is in use: -1, 0, 1
pthread_mutex_t _mutex [1] ;
pthread_cond_t _cond [2] ; // one for relative times and one for abs.
public: // TODO-FIXME: make dtor private
~PlatformParker() { guarantee (0, "invariant") ; }
public:
PlatformParker() {
int status;
status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());
assert_status(status == 0, status, "cond_init rel");
status = pthread_cond_init (&_cond[ABS_INDEX], NULL);
assert_status(status == 0, status, "cond_init abs");
status = pthread_mutex_init (_mutex, NULL);
assert_status(status == 0, status, "mutex_init");
_cur_index = -1; // mark as unused
}
};
初始化:
status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());
下面通过一段伪代码来看看pthread_cond_wait如何使用:
挂起
pthread_mutex_lock(&mutex); // 拿到互斥锁,进入临界区
while( 条件为假)
pthread_cond_wait(cond, mutex); // 令进程等待在条件变量上
修改条件
pthread_mutex_unlock(&mutex); // 释放互斥锁
通知
pthread_mutex_lock(&mutex); // 拿到互斥锁,进入临界区
设置条件为真
pthread_cond_signal(cond); // 通知等待在条件变量上的消费者
pthread_mutex_unlock(&mutex); // 释放互斥锁
通过上面的描述我们知道通过调用pthread_cond_wait调用使得线程挂起在&_cond[REL_INDEX]变量上,等待通知。
而且我们也可以知道了唤醒的操作肯定有:
pthread_cond_signal (&_cond[_cur_index]);
这样的代码。
我们上面通过pthread_mutex_trylock获取到了互斥锁的使用权,那么当我们执行完挂起操作后,就需要释放掉,以使得唤醒线程能够获取。
在上面方法的末尾可以看到这个逻辑的代码:
_counter = 0 ;
status = pthread_mutex_unlock(_mutex) ;
unpark
API解释如下:
Makes available the permit for the given thread, if it was not already available. If the thread was blocked on park then it will unblock. Otherwise, its next call to park is guaranteed not to block. This operation is not guaranteed to have any effect at all if the given thread has not been started.
使得指定的线程的"许可证"可用,如果线程是挂起的那么就唤醒它。不然,下一个park的调用不保证挂起。如果指定的线程未启动,那么这个方法不起作用。
下面还是通过源码来研究一下这个方法。
LockSupport.java
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
这里调用了UNSAFE类的unpark方法,这个类的unpark方法如下:
Unsafe.java
/**
* Unblock the given thread blocked on <tt>park</tt>, or, if it is
* not blocked, cause the subsequent call to <tt>park</tt> not to
* block. Note: this operation is "unsafe" solely because the
* caller must somehow ensure that the thread has not been
* destroyed. Nothing special is usually required to ensure this
* when called from Java (in which there will ordinarily be a live
* reference to the thread) but this is not nearly-automatically
* so when calling from native code.
* @param thread the thread to unpark.
*
*/
public native void unpark(Object thread);
这是个本地方法,继续查看:
unsafe.cpp
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
UnsafeWrapper("Unsafe_Unpark");
Parker* p = NULL;
if (jthread != NULL) {
oop java_thread = JNIHandles::resolve_non_null(jthread);
if (java_thread != NULL) {
jlong lp = java_lang_Thread::park_event(java_thread);
if (lp != 0) {
// This cast is OK even though the jlong might have been read
// non-atomically on 32bit systems, since there, one word will
// always be zero anyway and the value set is always the same
p = (Parker*)addr_from_java(lp);
} else {
// Grab lock if apparently null or using older version of library
MutexLocker mu(Threads_lock);
java_thread = JNIHandles::resolve_non_null(jthread);
if (java_thread != NULL) {
JavaThread* thr = java_lang_Thread::thread(java_thread);
if (thr != NULL) {
p = thr->parker();
if (p != NULL) { // Bind to Java thread for next time.
java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
}
}
}
}
}
}
if (p != NULL) {
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
#else /* USDT2 */
HOTSPOT_THREAD_UNPARK(
(uintptr_t) p);
#endif /* USDT2 */
p->unpark();
}
UNSAFE_END
看下面代码:
p = thr->parker();
这里是获取要唤醒的线程对象的Parker对象,
p->unpark();
执行Parker的unpark方法,Parker类的unpark方法代码如下:
os_linux.cpp
void Parker::unpark() {
int s, status ;
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
s = _counter;
_counter = 1;
if (s < 1) {
// thread might be parked
if (_cur_index != -1) {
// thread is definitely parked
if (WorkAroundNPTLTimedWaitHang) {
status = pthread_cond_signal (&_cond[_cur_index]);
assert (status == 0, "invariant");
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
} else {
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
status = pthread_cond_signal (&_cond[_cur_index]);
assert (status == 0, "invariant");
}
} else {
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
} else {
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
}
看下这里:
_counter = 1;
从这里可以看出,无论调用unpark多少次,Parker的_counter变量的值都是1。
也就是说进一步说明这个_counter变量其实就是个状态变量:有和没有。
park中我们已经讲得很详细了,再看这个方法的源码就觉得很简单了,方法逻辑就是加锁-唤醒-解锁。
pthread_mutex_lock(_mutex);
...
pthread_cond_signal (&_cond[_cur_index]);
...
pthread_mutex_unlock(_mutex);
park(Object blocker)
这个方法除了设置blocker之外,其他的跟park是一样的。
对blocker参数,API中是这样描述的:
This object is recorded while the thread is blocked to permit monitoring and diagnostic tools to identify the reasons that threads are blocked. (Such tools may access blockers using method [
getBlocker(Thread)
]. The use of these forms rather than the original forms without this parameter is strongly encouraged. The normal argument to supply as ablocker
within a lock implementation isthis
.
此对象在线程受阻塞时被记录,以允许监视工具和诊断工具确定线程受阻塞的原因。(这样的工具可以使用方法 getBlocker(java.lang.Thread) 访问 blocker。)建议最好使用这些形式,而不是不带此参数的原始形式。在锁实现中提供的作为 blocker 的普通参数是 this。
从上面的说明是便于问题分析,目前没有好的实例好说明,日后碰到了再补充。
网友评论