CountDownLatch
package com.example.concurrent.juc;
import java.util.concurrent.CountDownLatch;
/**
* 相当于一个终点线
*/
public class CountDownLatchDemo implements Runnable {
private CountDownLatch begin;
//观察end的count为0时,会使当前线程退出阻塞
private CountDownLatch end;
//用来模拟子线程执行的任务
private int index = 10;
public CountDownLatchDemo(CountDownLatch begin, CountDownLatch end) {
this.begin = begin;
this.end = end;
}
//这里要使用同步synchronized
synchronized public void run(){
try {
begin.await();
index--;
System.out.println(Thread.currentThread().getName() + ":执行完任务了, index: " + index);
} catch (Exception e) {
e.printStackTrace();
} finally {
end.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
CountDownLatch begin = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(10);
CountDownLatchDemo cdlDemo = new CountDownLatchDemo(begin, end);
System.out.println("所有线程开始执行任务....");
for (int i = 0; i < 10; i++) {
new Thread(cdlDemo).start();
}
//开始让所有线程工作
begin.countDown();
//
end.await();
System.out.println("所有线程完成任务,可以开始做其它想做的事 了....");
}
}
CyclicBarrier
package com.example.concurrent.juc;
import java.util.concurrent.CyclicBarrier;
/**
* //相当于一个中间等待线
*/
public class CyclicBarrierDemo implements Runnable {
private CyclicBarrier cyclicBarrier;
private int index ;
public CyclicBarrierDemo(CyclicBarrier cyclicBarrier, int index) {
this.cyclicBarrier = cyclicBarrier;
this.index = index;
}
@Override
public void run() {
try {
System.out.println("index: " + index);
cyclicBarrier.await();
System.out.println(index+"执行");
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(11);
for (int i = 0; i < 10; i++) {
new Thread(new CyclicBarrierDemo(cyclicBarrier, i)).start();
}
cyclicBarrier.await();
System.out.println("全部到达屏障....");
}
}
Exchanger
package com.example.concurrent.juc;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Exchanger;
/**
* 当一个线程运行到exchange()方法时会阻塞,另一个线程运行到exchange()时,二者交换数据,然后执行后面的程序。
*/
public class ExchangerDemo implements Runnable {
private Exchanger<Map<String, String>> exchanger;
private Map<String, String> info;
public ExchangerDemo (Exchanger<Map<String, String>> exchanger, Map<String, String> info) {
this.exchanger = exchanger;
this.info = info;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "交换前的值是:" + info.toString() );
Map<String, String> result = exchanger.exchange(info);
System.out.println(Thread.currentThread().getName() + "交换后的值是:" + result.toString() );
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Map<String, String> aMap = new HashMap<String, String>();
Map<String, String> bMap = new HashMap<String, String>();
aMap.put("a", "aaa");
aMap.put("b", "bbb");
aMap.put("c", "ccc");
bMap.put("1", "111");
bMap.put("2", "222");
bMap.put("3", "333");
Exchanger<Map<String, String>> exchanger = new Exchanger<Map<String, String>>();
new Thread(new ExchangerDemo(exchanger, aMap)).start();
new Thread(new ExchangerDemo(exchanger, bMap)).start();
}
}
Future
package com.example.concurrent.juc;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
/**
*异步线程返回值
*/
public class FutureDemo {
public static void main(String[] args) throws Exception {
Future<Integer> task = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() {
return 1+2+3+4;
}
});
new Thread((Runnable) task).start();
System.out.println("start.........");
TimeUnit.SECONDS.sleep(2);
System.out.println("result: " + task.get());
}
}
Semaphore
package com.example.concurrent.juc;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
*可以控制线程执行的数量
* 线程执行前用acquire()方法获得信号,执行完通过release()方法归还信号量。如果可用信号为0,acquire就会造成阻塞,等待release释放信号。作用就是只让指定个数的线程(随机选择或先来后到)并行。
*/
public class SemaphoreDemo implements Runnable {
private Semaphore sema;
public SemaphoreDemo (Semaphore semaphore) {
this.sema = semaphore;
}
public void run() {
try {
sema.acquire();
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + ": 执行完: " + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
sema.release();
}
}
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
SemaphoreDemo semaphoreDemo = new SemaphoreDemo(semaphore);
for (int i = 0; i < 10; i++) {
new Thread(semaphoreDemo).start();
}
}
}
AtomicInteger
package com.example.concurrent.juc;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 保证原子性
*/
public class AtomicDemo {
private static int CLIENT_COUNT = 5000;
private static AtomicInteger count = new AtomicInteger(0);
private static ExecutorService executorService = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
private final static CountDownLatch countDownLatch = new CountDownLatch(CLIENT_COUNT);
public static void main(String[] args) throws Exception {
testAtomicInteger();
}
private static void testAtomicInteger() throws Exception {
for (int i = 0; i < CLIENT_COUNT; i++) {
executorService.execute(() -> {
add();
// count每加 1,进行减 1 计数
countDownLatch.countDown();
});
}
// 等待线程池所有任务执行结束
countDownLatch.await();
executorService.shutdown();
System.out.println("ConcurrencyDemo:" + count);
}
private static void add() {
count.incrementAndGet();
}
}
网友评论