第九课 线程本地存储
public static class ThreadLocalVariableHolder{
private static ThreadLocal<Integer> value = new ThreadLocal<Integer>(){
private Random rand = new Random(47);
protected synchronized Integer initialValue(){
return rand.nextInt(1000);
}
};
public static void increment(){
value.set(value.get() + 1);
}
public static int get(){
return value.get();
}
}
虽然只有一个ThreadLocal<Integer> value的静态变量,而且看起来好像只能放进一个int值,
但实际上每个访问这个value对象的线程,都有了自己的一份拷贝,互不干扰。
所以用起来好像是一个Map,而且key是线程id之类的
首先,ThreadLocal 不是用来解决共享对象的多线程访问问题的,一般情况下,通过ThreadLocal.set() 到线程中的对象是该线程自己使用的对象,其他线程是不需要访问的,也访问不到的。各个线程中访问的是不同的对象。
另外,说ThreadLocal使得各线程能够保持各自独立的一个对象,并不是通过ThreadLocal.set()来实现的,而是通过每个线程中的new 对象 的操作来创建的对象,每个线程创建一个,不是什么对象的拷贝或副本。通过ThreadLocal.set()将这个新创建的对象的引用保存到各线程的自己的一个map中,每个线程都有这样一个map,执行ThreadLocal.get()时,各线程从自己的map中取出放进去的对象,因此取出来的是各自自己线程中的对象,ThreadLocal实例是作为map的key来使用的。
如果ThreadLocal.set()进去的东西本来就是多个线程共享的同一个对象,那么多个线程的ThreadLocal.get()取得的还是这个共享对象本身,还是有并发访问问题。
下面来看一个hibernate中典型的ThreadLocal的应用:
private static final ThreadLocal threadSession = new ThreadLocal();
public static Session getSession() throws InfrastructureException {
Session s = (Session) threadSession.get();
try {
if (s == null) {
s = getSessionFactory().openSession();
threadSession.set(s);
}
} catch (HibernateException ex) {
throw new InfrastructureException(ex);
}
return s;
}
可以看到在getSession()方法中,首先判断当前线程中有没有放进去session,如果还没有,那么通过sessionFactory().openSession()来创建一个session,再将session set到线程中,实际是放到当前线程的ThreadLocalMap这个map中,这时,对于这个session的唯一引用就是当前线程中的那个ThreadLocalMap(下面会讲到),而threadSession作为这个值的key,要取得这个session可以通过threadSession.get()来得到,里面执行的操作实际是先取得当前线程中的ThreadLocalMap,然后将threadSession作为key将对应的值取出。这个session相当于线程的私有变量,而不是public的。
显然,其他线程中是取不到这个session的,他们也只能取到自己的ThreadLocalMap中的东西。要是session是多个线程共享使用的,那还不乱套了。
试想如果不用ThreadLocal怎么来实现呢?可能就要在action中创建session,然后把session一个个传到service和dao中,这可够麻烦的。或者可以自己定义一个静态的map,将当前thread作为key,创建的session作为值,put到map中,应该也行,这也是一般人的想法,但事实上,ThreadLocal的实现刚好相反,它是在每个线程中有一个map,而将ThreadLocal实例作为key,这样每个map中的项数很少,而且当线程销毁时相应的东西也一起销毁了,不知道除了这些还有什么其他的好处。
总之,ThreadLocal不是用来解决对象共享访问问题的,而主要是提供了保持对象的方法和避免参数传递的方便的对象访问方式。归纳了两点:
1。每个线程中都有一个自己的ThreadLocalMap类对象,可以将线程自己的对象保持到其中,各管各的,线程可以正确的访问到自己的对象。
2。将一个共用的ThreadLocal静态实例作为key,将不同对象的引用保存到不同线程的ThreadLocalMap中,然后在线程执行的各处通过这个静态ThreadLocal实例的get()方法取得自己线程保存的那个对象,避免了将这个对象作为参数传递的麻烦。
当然如果要把本来线程共享的对象通过ThreadLocal.set()放到线程中也可以,可以实现避免参数传递的访问方式,但是要注意get()到的是那同一个共享对象,并发访问问题要靠其他手段来解决。但一般来说线程共享的对象通过设置为某类的静态变量就可以实现方便的访问了,似乎没必要放到线程中。
ThreadLocal的应用场合,我觉得最适合的是按线程多实例(每个线程对应一个实例)的对象的访问,并且这个对象很多地方都要用到。
下面来看看ThreadLocal的实现原理(jdk1.5源码)
public class ThreadLocal<T> {
/**
* ThreadLocals rely on per-thread hash maps attached to each thread
* (Thread.threadLocals and inheritableThreadLocals). The ThreadLocal
* objects act as keys, searched via threadLocalHashCode. This is a
* custom hash code (useful only within ThreadLocalMaps) that eliminates
* collisions in the common case where consecutively constructed
* ThreadLocals are used by the same threads, while remaining well-behaved
* in less common cases.
*/
private final int threadLocalHashCode = nextHashCode();
/**
* The next hash code to be given out. Accessed only by like-named method.
*/
private static int nextHashCode = 0;
/**
* The difference between successively generated hash codes - turns
* implicit sequential thread-local IDs into near-optimally spread
* multiplicative hash values for power-of-two-sized tables.
*/
private static final int HASH_INCREMENT = 0x61c88647;
/**
* Compute the next hash code. The static synchronization used here
* should not be a performance bottleneck. When ThreadLocals are
* generated in different threads at a fast enough rate to regularly
* contend on this lock, memory contention is by far a more serious
* problem than lock contention.
*/
private static synchronized int nextHashCode() {
int h = nextHashCode;
nextHashCode = h + HASH_INCREMENT;
return h;
}
/**
* Creates a thread local variable.
*/
public ThreadLocal() {
}
/**
* Returns the value in the current thread's copy of this thread-local
* variable. Creates and initializes the copy if this is the first time
* the thread has called this method.
*
* @return the current thread's value of this thread-local
*/
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
return (T)map.get(this);
// Maps are constructed lazily. if the map for this thread
// doesn't exist, create it, with this ThreadLocal and its
// initial value as its only entry.
T value = initialValue();
createMap(t, value);
return value;
}
/**
* Sets the current thread's copy of this thread-local variable
* to the specified value. Many applications will have no need for
* this functionality, relying solely on the {@link #initialValue}
* method to set the values of thread-locals.
*
* @param value the value to be stored in the current threads' copy of
* this thread-local.
*/
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
/**
* Get the map associated with a ThreadLocal. Overridden in
* InheritableThreadLocal.
*
* @param t the current thread
* @return the map
*/
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
/**
* Create the map associated with a ThreadLocal. Overridden in
* InheritableThreadLocal.
*
* @param t the current thread
* @param firstValue value for the initial entry of the map
* @param map the map to store.
*/
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
.......
/**
* ThreadLocalMap is a customized hash map suitable only for
* maintaining thread local values. No operations are exported
* outside of the ThreadLocal class. The class is package private to
* allow declaration of fields in class Thread. To help deal with
* very large and long-lived usages, the hash table entries use
* WeakReferences for keys. However, since reference queues are not
* used, stale entries are guaranteed to be removed only when
* the table starts running out of space.
*/
static class ThreadLocalMap {
........
}
}
可以看到ThreadLocal类中的变量只有这3个int型:
private final int threadLocalHashCode = nextHashCode();
private static int nextHashCode = 0;
private static final int HASH_INCREMENT = 0x61c88647;
而作为ThreadLocal实例的变量只有 threadLocalHashCode 这一个,nextHashCode 和HASH_INCREMENT 是ThreadLocal类的静态变量,实际上HASH_INCREMENT是一个常量,表示了连续分配的两个ThreadLocal实例的threadLocalHashCode值的增量,而nextHashCode 的表示了即将分配的下一个ThreadLocal实例的threadLocalHashCode 的值。
可以来看一下创建一个ThreadLocal实例即new ThreadLocal()时做了哪些操作,从上面看到构造函数ThreadLocal()里什么操作都没有,唯一的操作是这句:
private final int threadLocalHashCode = nextHashCode();
那么nextHashCode()做了什么呢:
private static synchronized int nextHashCode() {
int h = nextHashCode;
nextHashCode = h + HASH_INCREMENT;
return h;
}
就是将ThreadLocal类的下一个hashCode值即nextHashCode的值赋给实例的threadLocalHashCode,然后nextHashCode的值增加HASH_INCREMENT这个值。
因此ThreadLocal实例的变量只有这个threadLocalHashCode,而且是final的,用来区分不同的ThreadLocal实例,ThreadLocal类主要是作为工具类来使用,那么ThreadLocal.set()进去的对象是放在哪儿的呢?
看一下上面的set()方法,两句合并一下成为
ThreadLocalMap map = Thread.currentThread().threadLocals;
这个ThreadLocalMap 类是ThreadLocal中定义的内部类,但是它的实例却用在Thread类中:
public class Thread implements Runnable {
......
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;
......
}
再看这句:
if (map != null)
map.set(this, value);
也就是将该ThreadLocal实例作为key,要保持的对象作为值,设置到当前线程的ThreadLocalMap 中,get()方法同样大家看了代码也就明白了。
第九课 终结任务
1 任务的自我检查
有些任务会在循环中检查状态值,如canceled之类的,会自己退出任务
但有时我们需要任务更突然的终止任务
注意:如果有标志位canceled,isRunning等,这个一般是volatile的
FutureTask提供的取消应该就是这种情况
2 阻塞时终止
任务除了自我检查状态,也可能阻塞在sleep中,此时可能也需要将其终结
线程的状态:
——new:已经创建完毕,且已经start,资源分配完毕,等待分配时间片了,这个状态只会持续很短的时间,下一步就会进入运行或者阻塞
——run:就绪状态,只要给了时间片,就会运行,在任一时刻,thread可能运行也可能不运行
——block:阻塞状态,程序本身能够运行,但有个条件阻止了它运行,调度器会忽略这个线程,直到跳出阻塞条件,重新进入就绪状态
——dead:run()方法返回,或者被中断
都哪些方式可以进入block状态:
——sleep:等时间到
——wait:等notify
——等待IO
——等待synchronized锁
——等待Lock锁
另外,如下的大循环也可以interrupt
while(!Thread.currentThread().isInterrupted()){
ThreadLocalVariableHolder.increment();
System.out.println(this);
Thread.yield();
}
终结情形1:
//注意isCanceled是个boolean,而且一般会需要volatile修饰
public void run(){
while(!isCanceled){
//do some seriers work
}
}
终结情形2:
//thread.interrupt()可以打断
//Executors得不到thread的引用,只能通过ExecutorService.shutdownNow()来打断
//如果能拿到Future,可以Future.cancel(true)来打断
//exec.execute(Runnable)看来是打断不了了,因为拿不到什么引用
//exec.submit(),还是能打断的,返回了Future
//本质上都是调用了thread.interrupt()
public void run(){
while(!Thread.currentThread().isInterrupt()){ //或者用Thread.interrupted()判断
//do some seriers work
}
}
public void run(){
while(true){
Thread.sleep(1000); //被interrupt会抛出异常,因为既然是阻塞,被意外终止,异常看似挺合理
//do some seriers work
}
}
终结情形3:终结不了
在等待synchronized的线程,不可以被interrupt
但是注意,Lock可以尝试获取锁,并可以指定阻塞等待锁的时间限制
终结情形4:ReentrantLock.lockInterruptly()
ReentrantLock.lockInterruptly(),在这里获取不到锁,会阻塞,但是可以被interrupt方法中断
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
class BlockedMutex {
private Lock lock = new ReentrantLock();
public BlockedMutex() {
// Acquire it right away, to demonstrate interruption
// of a task blocked on a ReentrantLock:
lock.lock();
}
public void f() {
try {
// This will never be available to a second task
lock.lockInterruptibly(); // Special call
System.out.println("f()方法得到锁了??");
} catch(InterruptedException e) {
System.out.println("f()方法没得到锁,而是被中断了,被interrupt了");
}
}
}
class Blocked2 implements Runnable {
BlockedMutex blocked = new BlockedMutex();
public void run() {
System.out.println("等f()拿到ReentranLock");
blocked.f();
System.out.println("从f()返回了");
}
}
public class Interrupting2 {
public static void main(String[] args) throws Exception {
Thread t = new Thread(new Blocked2());
t.start();
TimeUnit.SECONDS.sleep(1);
System.out.println("调用了t.interrupt()");
t.interrupt();
}
} /* Output:
Waiting for f() in BlockedMutex
Issuing t.interrupt()
Interrupted from lock acquisition in f()
Broken out of blocked call
*///:~
终结情形5:IO密集型阻塞
在等待InputStream.read()的线程,不可以被interrupt
但是有个笨办法:关闭线程正在等待的底层IO资源,如关闭Socket
还有个更好的选择:nio,提供了更人性化的中断,被阻塞的nio通道会自动响应中断
- 注意:
- 1 如果一个Runnable没有cancel类的标志位检查,也没有检查isInterrupt(),调用interrupt()会怎么地?
- 参考c10.NothingToInterrupt,是关不掉的,但这种形式的无限循环,一般不会出现在真实场景里
- 真实场景什么样?一般是一个无限循环,但里面会阻塞(等待条件成熟),有跳出条件
- 2 线程被中断之后,会发异常InterrupttedException,有时需要清理资源,参考类c10.InterruptingIdiom
- 1 如果一个Runnable没有cancel类的标志位检查,也没有检查isInterrupt(),调用interrupt()会怎么地?
第十课 线程通信,线程协作
sleep和yield算是协作,我让你让大家让,但太底层,而且顺序根本不可控,完全不能依赖
join算是协作,等待嘛
wait和notify,算是第一次出现的像样的协作
队列
Piper
生产者和消费者模型
——wait和notify版
——队列模型
----java.utis.concurrent中的构件----
CoundDownLatch
CyclicBarrier
DelayQueue
PriorityBlockingQueue
Exchanger
程序可能在某个地方必须等待另一个线程完成任务,如果用无限循环来检查,这叫忙等待,很耗CPU
1 wait和notify,notifyAll
1.1 简介
obj.wait()和obj.notify()的作用:
——wait释放obj上的锁,所以必须先持有锁了,通过synchronized
——程序在这里开始阻塞,发出的信息就是:我在obj上等待,释放了obj的锁,并且等待notify
——别的程序此时可以拿到obj上的锁了
——notify也会先释放obj的锁,所以也得先拿到锁,obj.notify()会通知在obj上wait的对象
——此时wait的地方会再拿到锁,继续往下执行
public class Test1 {
public static class Waiter implements Runnable{
@Override
public void run() {
synchronized (this) {
System.out.println("我拿到锁了,并且wait了,锁就释放了,并且等待锁");
try {
wait(3000);
System.out.println("wait拿到锁,返回了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws InterruptedException {
Waiter w = new Waiter();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(w);
exec.shutdown();
// Thread.sleep(2000);
// synchronized (w) {
// System.out.println("拿到锁了");
// Thread.sleep(2000);
// w.notify();
// Thread.sleep(2000);
// System.out.println("notify了,notify不会释放锁,走到同步代码最后才释放锁");
// }
}
}
public class Test2 {
public static class Waiter implements Runnable{
@Override
public void run() {
synchronized (this) {
System.out.println("我拿到锁了,并且wait了,锁就释放了,并且等待锁");
try {
wait();
System.out.println("wait拿到锁,返回了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws InterruptedException {
Waiter w = new Waiter();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(w);
exec.shutdown();
Thread.sleep(2000);
synchronized (w) {
System.out.println("拿到锁了");
Thread.sleep(2000);
w.notify();
Thread.sleep(2000);
System.out.println("notify了,notify不会释放锁,走到同步代码最后才释放锁");
}
}
}
public class Test3 {
public static class Waiter implements Runnable{
@Override
public void run() {
synchronized (this) {
System.out.println("我拿到锁了,并且wait了,锁就释放了,并且等待锁");
while(true){
try {
wait();
System.out.println("wait拿到锁,返回了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) throws InterruptedException {
Waiter w = new Waiter();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(w);
exec.shutdown();
Thread.sleep(2000);
synchronized (w) {
System.out.println("拿到锁了");
Thread.sleep(2000);
w.notify();
Thread.sleep(2000);
System.out.println("notify了,notify不会释放锁,走到同步代码最后才释放锁");
}
}
}
要从wait中恢复,也就是让wait返回,必须满足两个条件:
——有人在同一个对象上notify过
——同一对象的锁被释放
——而notify也需要操作锁,所以也必须持有锁,但这个操作不是释放锁,也就是说notify之后,wait返回之前,还可以执行代码,只要在同步块里,这个时机就是锁释放之前
1.2 套路
一般用法是:
在一个线程里:
synchronized(obj){
while(condition不符合){
obj.wait(); //等到condition符合
}
//处理condition符合之后的逻辑
}
注意,这里有个有缺陷的wait的用法
while(condition不符合){
//Point-1:在这里,
)线程可能切换了,切到另一个线程,并且导致了condition符合了(并notify,但此时这里并未,然后切换回来,wait了,就死锁了
synchronized(obj){
obj.wait();
}
}
在另一个线程里:
synchronized(obj){
///处理condition,让其符合条件
obj.notify();
//做些wait返回之前可能需要做的事
//锁在这准备释放了,wait复活的两个条件都满足了
}
notify会唤醒最后一个在obj上wait的线程
notifyAll会唤醒所有在obj上wait的线程
可以接受时间参数的wait:
——给wait个超时时间
例子:很好的说明了wait和notify怎么用
class Car {
private boolean waxOn = false;
public synchronized void waxed() {
waxOn = true; // Ready to buff
notifyAll();
}
public synchronized void buffed() {
waxOn = false; // Ready for another coat of wax
notifyAll();
}
public synchronized void waitForWaxing()
throws InterruptedException {
while(waxOn == false)
wait();
}
public synchronized void waitForBuffing()
throws InterruptedException {
while(waxOn == true)
wait();
}
}
class WaxOn implements Runnable {
private Car car;
public WaxOn(Car c) { car = c; }
public void run() {
try {
while(!Thread.interrupted()) {
System.out.println("Wax On! ");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitForBuffing();
}
} catch(InterruptedException e) {
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax On task");
}
}
class WaxOff implements Runnable {
private Car car;
public WaxOff(Car c) { car = c; }
public void run() {
try {
while(!Thread.interrupted()) {
car.waitForWaxing();
System.out.println("Wax Off! ");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
} catch(InterruptedException e) {
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax Off task");
}
}
public class WaxOMatic {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5); // Run for a while...
exec.shutdownNow(); // Interrupt all tasks
}
} /* Output: (95% match)
Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Exiting via interrupt
Ending Wax On task
Exiting via interrupt
Ending Wax Off task
*///:~
- 总结:
- 这个例子有点太场景化,也就是太特殊,如果要用同步队列来实现:
- 上蜡和抛光可以看做是两条流水线,并且原则是:一条流水线处理一个队列里的消息
- 等待上蜡的队列,等待抛光的队列,而两条流水线,就变成了在queue.take()或者poll()时阻塞,思路就清楚多了
- 但是注意,不是所有的业务模型都可以映射到队列模型
1.3 为什么要在while(true)里wait
- 再说notifyAll和while(true){wait();}的问题
- 一个或多个任务在一个对象上等待同一个条件,不管谁被唤醒之后,条件可能已经改变了,所以wait醒来之后还是要对条件进行检查
- 在一个对象上wait的线程可能有多个,而且可能是针对不同的条件,所以被唤醒之后,需要检查一下是否自己需要的条件
- 总之,wait醒来之后,需要检查自己需要的条件是否满足,不满足的话还得继续wait
- 所以,在while(true)中wait通常是比较合理的做法
- notify只能唤醒一个
1.4 notify和notifyAll的区别
在同一个对象上有多个线程wait,notify唤醒最后一个等待的,notifyAll唤醒所有
2 Condition,await和signal
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
public class WaxOMatic2 {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
public static class Car {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private boolean waxOn = false;
public void waxed() {
lock.lock();
try {
waxOn = true; // Ready to buff
condition.signalAll();
} finally {
lock.unlock();
}
}
public void buffed() {
lock.lock();
try {
waxOn = false; // Ready for another coat of wax
condition.signalAll();
} finally {
lock.unlock();
}
}
public void waitForWaxing() throws InterruptedException {
lock.lock();
try {
while(waxOn == false)
condition.await();
} finally {
lock.unlock();
}
}
public void waitForBuffing() throws InterruptedException{
lock.lock();
try {
while(waxOn == true)
condition.await();
} finally {
lock.unlock();
}
}
}
public static class WaxOn implements Runnable {
private Car car;
public WaxOn(Car c) { car = c; }
public void run() {
try {
while(!Thread.interrupted()) {
System.out.println("Wax On! ");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitForBuffing();
}
} catch(InterruptedException e) {
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax On task");
}
}
public static class WaxOff implements Runnable {
private Car car;
public WaxOff(Car c) { car = c; }
public void run() {
try {
while(!Thread.interrupted()) {
car.waitForWaxing();
System.out.println("Wax Off! ");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
} catch(InterruptedException e) {
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax Off task");
}
}
} /* Output: (90% match)
Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Exiting via interrupt
Ending Wax Off task
Exiting via interrupt
Ending Wax On task
*///:~
这里要用到的锁就是ReentranLock
3 同步队列:接口BlockingQueue
3.1 简介
总而言之,同步队列可以使很多业务模型得以简化,处理问题的思路更简单
java提供了大量的BlockingQueue接口的实现,例子参考BlockingQueueTest
public interface BlockingQueue<E> extends Queue<E> {
/**
* 添加,如果没有空间,会阻塞等待
* @param e
* @throws InterruptedException
*/
void put(E e) throws InterruptedException;
/**
* 移除并返回,如果empty,则阻塞等待
*/
E take() throws InterruptedException;
/**
* 移除并返回,如果empty,会等待指定时间
* @param timeout
* @param unit
* @return
*/
E poll(long timeout, TimeUnit unit);
/**
* Returns the number of additional elements that this queue can ideally
* (in the absence of memory or resource constraints) accept without
* blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic
* limit.
*
* <p>Note that you <em>cannot</em> always tell if an attempt to insert
* an element will succeed by inspecting {@code remainingCapacity}
* because it may be the case that another thread is about to
* insert or remove an element.
*
* @return the remaining capacity
*/
int remainingCapacity();
public boolean contains(Object o);
/**
* 把队列里的元素都移到Collection里
* @param c
* @return
*/
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
java.util.concurrent.BlockingQueue<Message> queue =
//new ArrayBlockingQueue<BlockingQueueTest.Message>(10, true); //true是access policy,表示FIFO,先进先出
//new LinkedBlockingDeque<BlockingQueueTest.Message>(10);
//new DelayQueue<BlockingQueueTest.Message>();
//new PriorityBlockingQueue<BlockingQueueTest.Message>();
//new SynchronousQueue<BlockingQueueTest.Message>(true);
//new LinkedTransferQueue<BlockingQueueTest.Message>();
常规:
ArrayBlockingQueue
LinkedBlockingDeque
延迟队列:
DelayQueue
优先级队列
PriorityBlockingQueue
不懂的队列:
SynchronousQueue
LinkedTransferQueue
3.2 例子
例子1:PriorityQueue,优先级队列
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
private Random rand = new Random(47);
private static int counter = 0;
private final int id = counter++;
private final int priority;
protected static List<PrioritizedTask> sequence = new ArrayList<PrioritizedTask>();
public PrioritizedTask(int priority) {
this.priority = priority;
sequence.add(this);
}
public int compareTo(PrioritizedTask arg) {
return priority < arg.priority ? 1 : (priority > arg.priority ? -1 : 0);
}
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
} catch (InterruptedException e) {
// Acceptable way to exit
}
System.out.println(this);
}
public String toString() {
return String.format("[%1$-3d]", priority) + " Task " + id;
}
public String summary() {
return "(" + id + ":" + priority + ")";
}
public static class EndSentinel extends PrioritizedTask {
private ExecutorService exec;
public EndSentinel(ExecutorService e) {
super(-1); // Lowest priority in this program
exec = e;
}
public void run() {
int count = 0;
for (PrioritizedTask pt : sequence) {
System.out.println(pt.summary());
if (++count % 5 == 0)
System.out.println();
}
System.out.println();
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class PrioritizedTaskProducer implements Runnable {
private Random rand = new Random(47);
private Queue<Runnable> queue;
private ExecutorService exec;
public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e) {
queue = q;
exec = e; // Used for EndSentinel
}
public void run() {
// Unbounded queue; never blocks.
// Fill it up fast with random priorities:
for (int i = 0; i < 20; i++) {
queue.add(new PrioritizedTask(rand.nextInt(10)));
Thread.yield();
}
// Trickle in highest-priority jobs:
try {
for (int i = 0; i < 10; i++) {
TimeUnit.MILLISECONDS.sleep(250);
queue.add(new PrioritizedTask(10));
}
// Add jobs, lowest priority first:
for (int i = 0; i < 10; i++)
queue.add(new PrioritizedTask(i));
// A sentinel to stop all the tasks:
queue.add(new PrioritizedTask.EndSentinel(exec));
} catch (InterruptedException e) {
// Acceptable way to exit
}
System.out.println("Finished PrioritizedTaskProducer");
}
}
class PrioritizedTaskConsumer implements Runnable {
private PriorityBlockingQueue<Runnable> q;
public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q) {
this.q = q;
}
public void run() {
try {
while (!Thread.interrupted())
// Use current thread to run the task:
q.take().run();
} catch (InterruptedException e) {
// Acceptable way to exit
}
System.out.println("Finished PrioritizedTaskConsumer");
}
}
public class PriorityBlockingQueueDemo {
public static void main(String[] args) throws Exception {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>();
exec.execute(new PrioritizedTaskProducer(queue, exec));
exec.execute(new PrioritizedTaskConsumer(queue));
}
} /* (Execute to see output) */// :~
- 简介:
- 基于优先级,优先级的排序规则,由你来实现,就是实现方法compareTo
- 用到了add和take方法,和take配对的不是put吗
- 队列的元素类型Task,必须实现Comparable来对优先级进行排序,以保证按优先级顺序来
PriorityBlockingQueue代码1000多行呢
例子2:DelayQueue,延迟队列
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class DelayedTask implements Runnable, Delayed {
private static int counter = 0;
private final int id = counter++;
private final int delta;
private final long trigger;
protected static List<DelayedTask> sequence = new ArrayList<DelayedTask>();
public DelayedTask(int delayInMilliseconds) {
delta = delayInMilliseconds;
trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS);
sequence.add(this);
}
public long getDelay(TimeUnit unit) {
return unit.convert(trigger - System.nanoTime(), NANOSECONDS);
}
public int compareTo(Delayed arg) {
DelayedTask that = (DelayedTask) arg;
if (trigger < that.trigger)
return -1;
if (trigger > that.trigger)
return 1;
return 0;
}
public void run() {
System.out.println(this + " ");
}
public String toString() {
return String.format("[%1$-4d]", delta) + " Task " + id;
}
public String summary() {
return "(" + id + ":" + delta + ")";
}
public static class EndSentinel extends DelayedTask {
private ExecutorService exec;
public EndSentinel(int delay, ExecutorService e) {
super(delay);
exec = e;
}
public void run() {
for (DelayedTask pt : sequence) {
System.out.println(pt.summary() + " ");
}
System.out.println();
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class DelayedTaskConsumer implements Runnable {
private DelayQueue<DelayedTask> q;
public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
this.q = q;
}
public void run() {
try {
while (!Thread.interrupted())
q.take().run(); // Run task with the current thread
} catch (InterruptedException e) {
// Acceptable way to exit
}
System.out.println("Finished DelayedTaskConsumer");
}
}
public class DelayQueueDemo {
public static void main(String[] args) {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
// Fill with tasks that have random delays:
for (int i = 0; i < 20; i++)
queue.put(new DelayedTask(rand.nextInt(5000)));
// Set the stopping point
queue.add(new DelayedTask.EndSentinel(5000, exec));
exec.execute(new DelayedTaskConsumer(queue));
}
} /*
* Output: [128 ] Task 11 [200 ] Task 7 [429 ] Task 5 [520 ] Task 18 [555 ] Task
* 1 [961 ] Task 4 [998 ] Task 16 [1207] Task 9 [1693] Task 2 [1809] Task 14
* [1861] Task 3 [2278] Task 15 [3288] Task 10 [3551] Task 12 [4258] Task 0
* [4258] Task 19 [4522] Task 8 [4589] Task 13 [4861] Task 17 [4868] Task 6
* (0:4258) (1:555) (2:1693) (3:1861) (4:961) (5:429) (6:4868) (7:200) (8:4522)
* (9:1207) (10:3288) (11:128) (12:3551) (13:4589) (14:1809) (15:2278) (16:998)
* (17:4861) (18:520) (19:4258) (20:5000) [5000] Task 20 Calling shutdownNow()
* Finished DelayedTaskConsumer
*/// :~
- 要点:
- 队列的元素类型:需要实现Delayed接口,包括getDelay()和compareTo()两个方法
- getDlay决定了任务是否到时可以运行
- compareTo决定了队列会排序,并且队列头就是第一个应该被运行的任务
- 带着问题看代码:
- 普通队列在take()这里阻塞,有新任务put进来,则被唤醒,所以只要有任务,就不会阻塞
- DelayQueue即使有消息,也会阻塞,表示任务都没到时间,这种阻塞,何时被唤醒?
- DelayQueue内部有个PriorityQueue
- 参考下面的代码,就是DelayQueue的take方法实现
- 取出队列头(peek,看看,不删除)
- 如果是null,说明队列空,则Condition.await()
- 如果不空,则根据任务的int delay = getDelay()方法,计算出delay时间,即多久之后任务可运行
- 注意队列是根据你实现的compareTo排好序的,队列头就应该是第一个可以被运行的任务
- getDelay()方法也是你实现的,在任务内部,你要记录延时时间(时间间隔),触发时间(具体时刻,计算出来并存好),所以随时随刻你都可以在getDelay()方法中计算出任务多久之后应该被触发
- 如果delay <= 0,就返回(queue.poll()),任务可以执行了
- 如果delay > 0,说明还没到时,则Condition.awaitNanos(delay),阻塞指定时间后,再开始新一轮for循环,此时队列头的任务应该到时了
- leader不知道是干啥的啊,先当成一直是null来理解的
- ReentrantLock决定了多个线程在队列上take时,同一时刻只有一个线程会进入,所以只会有一个线程在await上阻塞,其他会在ReentrantLock上阻塞
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
Thread leader
的作用:没明白
Thread designated to wait for the element at the head of
the queue. This variant of the Leader-Follower pattern
(http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
minimize unnecessary timed waiting. When a thread becomes
the leader, it waits only for the next delay to elapse, but
other threads await indefinitely. The leader thread must
signal some other thread before returning from take() or
poll(...), unless some other thread becomes leader in the
interim. Whenever the head of the queue is replaced with
an element with an earlier expiration time, the leader
field is invalidated by being reset to null, and some
waiting thread, but not necessarily the current leader, is
signalled. So waiting threads must be prepared to acquire
and lose leadership while waiting.
3.3更多
- 一些实践中的应用
- 安卓的Looper和Handler可以作为一个通用的
- UI框架的大循环,是一个阻塞式的,循环处理的其实是UI上用户操作产生的事件,界面不会频繁刷新
- 游戏框架的大循环,是一个非阻塞式的,每一轮循环必须完成刷新界面的操作,也必须处理用户的输入和游戏逻辑,并且有被限制的帧率
- 有些特定的业务模型,可以用队列来简化,如抢单:(本来打算写写抢单逻辑,但发现单单从同步队列来考虑还不够)
- Model层的repo负责存储订单,提供增删改查操作,并对外通过事件总线发出通知
- 线程A负责接收服务器来的订单消息,repo.add()
- 线程B负责不断遍历所有订单,在后台刷新每个订单的倒计时等恒变状态,并调用repo.update(), repo.delete()
- 主线程负责:思路断了
- 重新调整思路:
- Model层不变
- 线程A不变
- 现在说谁能改变订单状态:
- 后台任务,主要是各种倒计时
- 用户操作
- 编不下去了
4 管道:PiperWriter和PiperReader
这个io在read上的阻塞是可以interrupt的,与之相比,System.in.read()就不可中断
网友评论