“工欲善其事必先利其器”,有了这些并发工具,多线程控制变得So easy。
与文无关JDK中已经给我们内置了很多并发工具,都属于应用类型,知道具体如何使用就好,主要讲以下几个类:
- CountDownLatch
- CyclicBarrier
- Semaphore
- LockSupport
- BlockingQueue
这次的几个案例都需要实际运行,看运行效果才明白怎么回事,代码可以直接复制粘贴。
CountDownLatch
多线程控制类,计数器栅栏,当计数器满足条件的时候,再开始执行接下来的操作。
public class CountDownLatchTest {
static final int THREAD_COUNT = 10;
static final CountDownLatch end = new CountDownLatch(THREAD_COUNT);
public static void main(String[] args) throws InterruptedException {
Runnable demo = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("检查完成");
end.countDown();
}
};
//线程池内有5个线程方便看效果
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < THREAD_COUNT; i++) {
executorService.submit(demo);
}
end.await();
System.out.println("一切就绪");
executorService.shutdown();
}
}
CountDownLatch 运行效果
CyclicBarrier
循环栅栏,可以看做CountDownLatch的重复利用。当满足一定的条件时候,才开始执行某线程。
// 当线程的数量满足条件时候,才开始执行。
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
@Override
public void run() {
System.out.println("一切就绪,准备出发");
}
});
Runnable task = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getId() + ":就绪");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
};
ExecutorService executorService = Executors.newFixedThreadPool(4);
for (int i = 0; i < 4; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
}
CyclicBarrier运行结果
所有的线程都在等待,当等待的线程达到一定的数量,然后开始执行接下来的操作。
Semaphore
Semaphore,也是控制线程的一种手段,可以控制并发线程的数量,某些时候我们线程数过多,在访问有限的资源时候,可以使用Semaphore来控制线程的数量。
public class SemaphoreDemo implements Runnable {
Semaphore mSemaphore = new Semaphore(5);
@Override
public void run() {
try {
mSemaphore.acquire();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId() + " done!");
mSemaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(20);
SemaphoreDemo demo = new SemaphoreDemo();
for (int i = 0; i < 20; i++) {
executorService.submit(demo);
}
executorService.shutdown();
;
}
}
Semaphore 运行案例
LockSupport
LockSupport提供了一些静态方法用于阻塞线程,和唤醒线程的功能。
处于park()挂起状态的线程是Waiting状态,park()方法阻塞的线程还支持中断,不抛出中断异常的同时设置中断标志位,然后我们可以通过中断标志位来检查。
public class LockDemo implements Runnable{
public static Object sObject = new Object();
@Override
public void run() {
synchronized (sObject){
System.out.println("当前线程名称:" + Thread.currentThread().getName());
LockSupport.park();
if (Thread.currentThread().isInterrupted()){
System.out.println( Thread.currentThread().getName() + "被中断了");
}
System.out.println("执行结束");
}
}
public static void main(String[] args) throws InterruptedException {
LockDemo demo = new LockDemo();
Thread t1 = new Thread(demo,"t1");
Thread t2 = new Thread(demo,"t2");
t1.start();
Thread.sleep(3000);
t2.start();
t1.interrupt();
LockSupport.unpark(t2);
}
}
LockSupport的demo
BlockingQueue
Java的Queue也是面试中经常提到的知识点,这次因为我们只涉及到并发相关知识,所以只提一些并发相关的Queue,关于Queue的具体分析等后面的数据结构系列的时候再详细解说。
BlockingQueue是Java中的阻塞队列,JDK中提供了7个阻塞队列
- ArrayBlockingQueue : 数组实现的有界队列,对元素进行FIFO(先进先出)的原则排序。
- LinkedBlockingQueue: 链表组成的有界队列,长度默认最大值为Integer.MAX_VALUE,元素按FIFO原则排序,性能好于ArrayBlockingQueue。
- PriorityBlockingQueue:支持优先级的无界阻塞队列。
- DelayQueue: 支持延迟获取元素的无界阻塞队列
- SynchronousQueue:不存储元素的阻塞队列。每一个put操作必须等待take操作,否则不能继续添加元素。
- LinkedTransferQueue:链表组成的无界传输队列
- LinkedBlockingDeque:由链表组成的双向阻塞队列。可以从两段插入和移除元素。
带大家看一下LinkedBlockingQueue的几个关键方法:
//LinkedBlockingQueue 方法探索
// 添加元素
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
//如果队列满了,直接返回false
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
// 创建新的节点
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 如果队列不满的话,就让元素加入队列。
//然后判断,当前队列元素是否满了,不满的话,通知notFull条件。
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
// 假如添加的是第一个元素,通知队列不为空了。
if (c == 0)
signalNotEmpty();
return c >= 0;
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 当队列满的时候进行等待。若不满入队
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
// 同offer
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 同offer
if (c == 0)
signalNotEmpty();
}
可以看出添加元素上:
- 当队列满的时候,offer不添加元素,立刻返回。put则会阻塞操作,直到队列为不满。
- 还有一个带参数的offer方法,和put的唯一区别就是有超时时间,在一段时间内队列还不空的话,就返回。
//LinkedBlockingQueue 方法探索
// 移除
public E poll() {
final AtomicInteger count = this.count;
// 队列为空,返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//队列有元素的话,取出元素
//取出元素后如果队列是不为空,发出不为空的信号。
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
//如果取出元素之前,队列是满的,因为取出了元素,现在发出不满的信号
if (c == capacity)
signalNotFull();
return x;
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 队列为空的话,就等待队列不为空。
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
可以看出LinkedBlockingQueue的移除操作poll和take方法:
- poll不阻塞,take会阻塞
- poll(long timeout, TimeUnit unit),当队列为空的时候,等待指定的时间,如果队列扔为空,那么就返回。
这次是带领大家一起看了下LinkedBlockingQueue的关键方法,其它的队列的操作也都类似,望大家自行查看,JDK中Queue的实现并不难理解。
最后
这次主要介绍了几个并发中可能会用到的工具类,最后说了下JDK并发包中的阻塞队列,阻塞队列相对比较重要,就简单的分析了其实现。
希望能帮助到大家。
参考
- 《Java并发实战》
- 《Java高并发程序设计》
- 《并发编程的艺术》
网友评论