1. 栅栏
3个工作线程,同时开始执行
public class BarrierTest {
private static class Worker implements Runnable
{
private CyclicBarrier cyclicBarrier;
public Worker(CyclicBarrier cyclicBarrier)
{
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+" is waiting");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName()+" 开始执行");
TimeUnit.SECONDS.sleep(new Random().nextInt(3));
System.out.println(Thread.currentThread().getName()+" 执行完毕");
}catch (Exception e)
{
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int threadCount = 3;
ExecutorService executorService = Executors.newCachedThreadPool();
CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
for(int i=0;i<threadCount;i++)
{
System.out.println("创建工作线程"+i);
Worker worker = new Worker(cyclicBarrier);
executorService.execute(worker);
}
executorService.shutdown();
}
}
2. 信号量
使用semaphore实现有界HashSet
class BoundHashSet<T>
{
private final Set<T> hashSet;
private final Semaphore semaphore;
public BoundHashSet(int bound) {
hashSet = Collections.synchronizedSet(new HashSet<>());
this.semaphore = new Semaphore(bound);
}
public boolean add(T data) throws InterruptedException {
semaphore.acquire();
boolean wasAdd = false;
try {
wasAdd= hashSet.add(data);
return wasAdd;
}finally {
if(!wasAdd)
{
semaphore.release();
}
}
}
public boolean remove(Object o)
{
boolean wasRemove = hashSet.remove(o);
if(wasRemove)
{
semaphore.release();
}
return wasRemove;
}
}
3.闭锁
闭锁有2种,countDownLatch和FutureTask
- countDownLatcher
首先是countDownLatch的demo
JAVA 并发编程实战上面的demo
public static long timeTasks(int nThreads, final Runnable task) throws InterruptedException
{
final CountDownLatch startDate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for(int i=0;i<nThreads;i++)
{
Thread t = new Thread(){
@Override
public void run()
{
try {
startDate.await();
try {
task.run();
}finally {
endGate.countDown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
t.start();
}
long start = System.nanoTime();
startDate.countDown();
endGate.await();
long end = System.nanoTime();
return end-start;
}
网上自己找的demo
老板和工人,老板要等工人全部完成工作后验收
class worker implements Runnable
{
private CountDownLatch countDownLatch;
private String name;
public worker(String name, CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
this.name = name;
}
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
this.doWork();
try{
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
}catch(InterruptedException ie){
}
System.out.println(name+"干完了");
this.countDownLatch.countDown();
}
private void doWork()
{
System.out.println(name+"is working");
}
}
class boss implements Runnable
{
CountDownLatch countDownLatch;
public boss(CountDownLatch countDownLatch)
{
this.countDownLatch = countDownLatch;
}
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
System.out.println("boss is waiting");
try {
this.countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("老板开始检查了");
}
}
public class LatchTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
CountDownLatch work = new CountDownLatch(3);
worker work1 = new worker("work1", work);
worker work2 = new worker("work2", work);
worker work3 = new worker("work3", work);
boss boss1 = new boss(work);
executorService.execute(work1);
executorService.execute(work2);
executorService.execute(work3);
executorService.execute(boss1);
executorService.shutdown();
}
}
- FutureTask
用FutureTask实现老板与长工问题
class Work implements Callable
{
private String name ;
public Work(String name) {
this.name = name;
}
public boolean doWork()
{
System.out.println(name+"is working");
return true;
}
@Override
public Boolean call() throws Exception {
this.doWork();
try{
TimeUnit.SECONDS.sleep(new Random().nextInt(3));
}catch(InterruptedException ie){
}
System.out.println(name+"干完了");
return true;
}
}
class Boss implements Callable
{
public boolean doWork() throws ExecutionException, InterruptedException {
System.out.println("boss is working");
return true;
}
@Override
public Boolean call() throws Exception {
this.doWork();
return true;
}
}
public class FutureTaskTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
Boss boss = new Boss();
Work work1 = new Work("a");
Work work2 = new Work("b");
Work work3 = new Work("c");
FutureTask<Boolean> futureTask = new FutureTask<Boolean>(boss);
FutureTask<Boolean> workTask1 = new FutureTask<Boolean>(work1);
FutureTask<Boolean> workTask2 = new FutureTask<Boolean>(work2);
FutureTask<Boolean> workTask3 = new FutureTask<Boolean>(work3);
executorService.execute(workTask1);
executorService.execute(workTask2);
executorService.execute(workTask3);
try {
workTask1.get();
workTask2.get();
workTask3.get();
if(workTask1.isDone()&&workTask2.isDone()&&workTask3.isDone())
{
executorService.execute(futureTask);
futureTask.get();
}
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.shutdown();
}
}
网友评论