一、 概括图
概括图二、程序示例
1. CountDownLatch
package test.java;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch start = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(10);
for(int i = 0; i < 10; i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
start.await();
System.out.println("Task is running...");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
end.countDown();
}
}
}).start();
}
long startTime = System.currentTimeMillis();
start.countDown();
end.await();
long endTime = System.currentTimeMillis();
System.out.println("All running time:"+ (endTime - startTime)+" minis");
}
}
2. Semaphore
package test.java;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
public class BoundList<T> {
private final List<T> list;
private final Semaphore semaphore;
public BoundList(int bound){
list = Collections.synchronizedList(new ArrayList<>());
semaphore = new Semaphore(bound);
}
public boolean add(T t) throws InterruptedException {
semaphore.acquire();
boolean isAdd = false;
try {
isAdd = list.add(t);
return isAdd;
}finally {
if(!isAdd){
semaphore.release();
}
}
}
public boolean remove(T t){
boolean isRemoved = list.remove(t);
if(isRemoved){
semaphore.release();
}
return isRemoved;
}
}
3. CyclicBarrier
package test.java;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
private final CyclicBarrier barrier;
private final worker[] workers;
public CyclicBarrierTest(int count){
this.barrier = new CyclicBarrier(count, new Runnable() {
@Override
public void run() {
System.out.println("All thread have got the barrier...");
}
});
this.workers = new worker[count];
for(int i = 0; i < count; i++){
workers[i] = new worker();
}
}
public void start(){
for(int i = 0; i < workers.length; i++){
new Thread(workers[i]).start();
}
}
private class worker implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+ " is running...");
try {
barrier.await();
} catch (InterruptedException e) {
return;
} catch (BrokenBarrierException e) {
return;
}
System.out.println(Thread.currentThread().getName() + " Out of barrier...");
}
}
}
4. Exchanger
package test.java;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Exchanger;
public class ExchangerTest {
private Exchanger<BlockingQueue> exchanger;
private BlockingQueue<String> reader;
private BlockingQueue<String> writer;
public ExchangerTest(Exchanger<BlockingQueue> exchanger,
BlockingQueue<String> reader,
BlockingQueue<String> writer) {
this.exchanger = exchanger;
this.reader = reader;
this.writer = writer;
}
public void exchangeQueue(){
new Thread(new Runnable() {
@Override
public void run() {
while(true){
if(reader.isEmpty()){
try {
reader = exchanger.exchange(reader);
System.out.println("The reader is " + reader.hashCode());
} catch (InterruptedException e) {
return;
}
}else{
reader.remove();
}
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
while (true){
if(writer.size() == 15){
try {
writer = exchanger.exchange(writer);
System.out.println("The writer is "+ writer.hashCode());
} catch (InterruptedException e) {
return;
}
}else{
writer.add("123");
}
}
}
}).start();
}
}
Exchanger类仅可用作两个线程的信息交换,当超过两个线程调用同一个exchanger对象时,得到的结果是随机的,exchanger对象仅关心其包含的两个“格子”是否已被填充数据,当两个格子都填充数据完成时,该对象就认为线程之间已经配对成功,然后开始执行数据交换操作。而剩下的未得到配对的线程,则会被阻塞,永久等待,直到与之配对的线程到达位置。
网友评论