Semaphore
Semaphore是一个计数信号量,常用于限制客访问某些资源的线程数目,相当于一种用来控制并发量的共享锁
- 用于多个共享资源互斥使用
- 用于控制并发线程数
Semaphore的简单使用
import java.util.concurrent.Semaphore;
public class Demo {
// 创建只能同时有5个线程的信道
static Semaphoresp = new Semaphore(5);
public static void main(String args[]){
for (int i=0; i<1000; i++){
new Thread(){
@Override
public void run() {
try {
sp.acquire(); //抢信号量、就是在加锁
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
queryDB("localhost:3006");
sp.release(); //释放信号量,就是解锁
}
}.start();
}
}
// 模拟操作DB
public static void queryDB(String url){
System.out.println("query " + url);
}
}
使用AQS实现Semaphore
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class DemoSemaphore {
private Sync sync;
public DemoSemaphore(int permits){
this.sync = new Sync(permits);
}
//获取信号量
public void acquire(){
sync.acquireShared(1);
}
//释放信号量
public void release(){
sync.releaseShared(1);
}
//AQS里面有很多没有实现的方法,要使用AQS,一个创建AQS的实例,并且重写方法
class Sync extends AbstractQueuedSynchronizer {
private int permits;
public Sync(int permits){
this.permits = permits;
}
//这里不需要考虑入队列、出队列, 这些都是不带try的方法中实现了,作为了公共的业务逻辑
@Override
protected int tryAcquireShared(int arg) {
//获取锁的线程,最多不能超过n个
int c = getState(); //state此处表示信号量获取的个数
int nextc = c + arg; //arg一般是1
if (nextc <= permits){
if (compareAndSetState(c, nextc))
return 1;
}
return -1;
}
@Override
protected boolean tryReleaseShared(int arg) {
for (;;){
int c = getState();
if (c == 0) return false;
int nextc = c - arg;
if (compareAndSetState(c, nextc)){
return true;
}
}
}
//同样是失败,release的时候,要自旋,而acquire却没有
}
}
CountDownLatch
CountDownLatch是一个倒计数器同步工具类,用来协调多个线程之间的同步
- 让一些线程阻塞直到另一些线程完成一系列操作后才唤醒。
- 通过调用await方法让线程进入阻塞状态等待倒计时0时唤醒。
- 通过线程调用countDown方法让倒计时中的计数器减去1,当计数器为0时,会唤醒哪些因为调用了await而阻塞的线程。
CountDownLatch简单使用
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class Demo {
/*
火箭起飞前,有很多检查需要做,每项检查需要的时间不同,
完成全部10项检查后,火箭才能点火
*/
public static void test01() throws InterruptedException {
CountDownLatch ctl = new CountDownLatch(10);
//任务在异步的执行
for (int i=0; i<10; i++){
int number = i;
new Thread(){
@Override
public void run() {
int randomInt = new Random().nextInt(10);
try {
Thread.sleep(randomInt * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(">>>>" + number);
//当任务执行完,将count-1
ctl.countDown();
}
}.start();
}
//通过await来阻塞住
System.out.println("主线程开始等待。。。");
ctl.await();
System.out.println("点火...");
}
/*
预备,跑!!!
*/
public static void test02() throws InterruptedException {
CountDownLatch ctl = new CountDownLatch(1);
for (int i=0; i<6; i++){
int number = i;
new Thread(){
@Override
public void run() {
System.out.println(number + " is redy...");
ctl.await();
System.out.println(String.format("运动员%d起跑", number));
}
}.start();
}
System.out.println("预备");
Thread.sleep(3000);
ctl.countDown();
System.out.println("跑!!!");
}
public static void main(String args[]) throws InterruptedException {
test02();
}
}
使用AQS实现CountDownLatch
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class DemoCountDownLatch {
private Sync sync;
public DemoCountDownLatch(int count){
this.sync = new Sync(count);
}
//释放共享锁
//state初始值为count,只有在state减为0的时候,才能释放锁成功
public void countDown(){
sync.releaseShared(1);
}
//获取共享,只有在state=0时,才能获取锁成功
public void await(){
sync.acquireShared(1);
}
class Sync extends AbstractQueuedSynchronizer{
public Sync(int count){
setState(count); //state用来记录倒计数
}
@Override
protected int tryAcquireShared(int arg) {
return getState() == 0 ? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int arg) {
for (;;){
int c = getState();
if (c == 0)
return false;
int nextc = c - arg;
//减1需要使用CAS操作
if (compareAndSetState(c, nextc)){
return nextc == 0;
}
}
}
}
}
CyclicBarrier
CyclicBarrier循环栅栏,底层是通过ReentrantLock以及Condition中的await和signal实现
- 让线程到达一个屏障时被阻塞,当达到屏障数量后,线程才会继续执行
- 它通过调用await方法让线程进入屏障
CyclicBarrier简单使用
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.LockSupport;
public class Demo {
public static void main(String args[]){
// 按4个一组执行
CyclicBarrier barrier = new CyclicBarrier(4);
//传入一个Runnable,打印栅栏
for (int i=0; i< 100; i++){
new Thread(){
@Override
public void run() {
barrier.await(); //
System.out.println("上到摩天轮...");
}
}.start();
LockSupport.parkNanos(1000 * 1000 * 1000L);
}
}
}
使用ReentrantLock以及Condition实现CyclicBarrier
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class DemoCyclicBarrier {
//condition实现
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
//一个批次的大小
private final int parties;
//记录当前一轮有多少个线程等待
private int count = 0;
//全局年代
private Object generation = new Object();
public DemoCyclicBarrier(int parties){
if (parties <=0)
throw new IllegalArgumentException();
this.parties = parties;
}
//进入下一轮等待,叫做进入下一个 年代
public void nextGeneration(){
count = 0;
generation = new Object();
condition.signalAll();
}
public void await(){
lock.lock();
try {
Object myGeneration = generation;
int index = ++count;
//若当前一轮,集满
if (index == parties){
//进入下一轮 : count =0, 唤醒所有线程
nextGeneration();
return;
}
for (;;){
//没有集满,挂起线程
try {
condition.await(); //await方法用pak来实现的
} catch (InterruptedException e) {
e.printStackTrace();
}
//什么时候应该让线程结束等待???
if (myGeneration != generation)
return;
}
}finally {
lock.unlock();
}
}
}
fork join框架
- ForkJoinPool是ExecutorService的实现类,是一种特殊的线程池
- 主要是对一个任务的拆分与合并
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class Demo {
// 有一堆大文件,每一行放的是一个URL,你需要进行HTTP
// 用线程池,任务的拆分,很多文件,逻辑非常零散,跨多个线程
static ArrayList<String> urls = new ArrayList<String>(){
{
add("http://www.baidu.com");
add("http://www.sina.com");
// ....
}
};
static ForkJoinPool forkJoinPool = new ForkJoinPool(3,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
true);
//模拟网络请求
public static String doRequest(String url){
return "Kody ... test ... " + url + "\n";
}
public static void main(String args[]) throws ExecutionException, InterruptedException {
Job job = new Job(urls, 0, urls.size());
ForkJoinTask<String> forkJoinTask = forkJoinPool.submit(job);
String result = forkJoinTask.get();
System.out.println(result);
}
//使用ForkJoin最核心的内容,就是定义 递归任务,
//定义递归任务,即定义如何对Task进行拆分,对结果进行汇总
//定义就放在compute方法中
static class Job extends RecursiveTask<String>{
List<String> urls;
int start;
int end;
public Job(List<String> urls, int start, int end){
this.urls = urls;
this.start = start;
this.end = end;
}
@Override
protected String compute() {
int count = end - start; //计算任务大小
//若任务比较小,就直接执行, // 10
if (count <=10){
String result = "";
for (int i = start; i< end; i++){
String response = doRequest(urls.get(i));
result += response;
}
return result;
}else{
//否则,拆分任务
int x = (start + end) / 2;
Job job1 = new Job(urls, start, x);
job1.fork();
Job job2 = new Job(urls, x, end);
job2.fork();
//汇总结果
String result = "";
result += job1.join();
result +=job2.join();
return result;
}
}
}
}
Future、FutureTask
Future只是一个接口,FutureTask是实现了RunnableFuture
- Future呈现的是异步计算的结果。
- Future中的方法提供了检查计算是否已经完成,并且等待计算的结果,还能够重新获取计算记得结果。
- 已经完成的时候只能使用get()方法获取结果,如果有需要的话,可以一直阻塞等待结果,直到结果已经准备好了
Callable
有两种创建线程的方法-一种是通过创建Thread类,另一种是通过使用Runnable创建线程。但是,Runnable缺少的一项功能是,当线程终止时(即run()完成时),我们无法使线程返回结果。为了支持此功能,Java中提供了Callable接口
- 为了实现Runnable,需要实现不返回任何内容的run()方法,而对于Callable,需要实现在完成时返回结果的call()方法。请注意,不能使用Callable创建线程,只能使用Runnable创建线程
- 另一个区别是call()方法可以引发异常,而run()则不能
- 为实现Callable而必须重写call方法
FutureTask 与 Callable 简单使用
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class Demo {
public static void main(String args[]) throws InterruptedException, ExecutionException {
//使用:用来包裹一个callab实例,得到的futureTask实例可以传入Thread()
CallableTask task = new CallableTask();
FutureTask<String> future = new FutureTask<>(task);
new Thread(future).start();
String result = future.get(); //get方法会阻塞
System.out.println(result);
//一个futureTask实例,只能使用一次
//同时说明,这个任务,从头到尾只会被一个线程执行
new Thread(future).start();
}
}
class CallableTask implements Callable<String>{
@Override
public String call() throws Exception {
System.out.println(">>>执行任务。。。");
//模拟耗时
LockSupport.parkNanos(1000 * 1000 *1000 * 5L);
return "success";
}
}
模拟FutureTask原理
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
public class DemoFutureTask<T> implements Runnable{
public DemoFutureTask(Callable<T> call){
this.call = call;
}
private Callable<T> call;
T result;
//Runner,用来实现抢执行的权限
AtomicReference<Thread> runner = new AtomicReference<>();
//等待队列
LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
//任务状态
private volatile int state = NEW;
private static final int NEW = 0;
private static final int RUNNING = 1;
private static final int FINISHED = 2;
@Override
public void run() {
if (state != NEW ||
!runner.compareAndSet(null, Thread.currentThread())){
return;
}
state = RUNNING;
try {
result = call.call();
} catch (Exception e) {
e.printStackTrace();
}finally {
state = FINISHED;
}
while (true){
Thread th = waiters.poll();
if (th == null){
break;
}
LockSupport.unpark(th);
}
}
public T get(){
if (state != FINISHED){
waiters.offer(Thread.currentThread());
}
//挂起线程
while (state!=FINISHED){
LockSupport.park();
}
return result;
}
}
网友评论