读写锁
写锁:也叫独占锁,一次只能被一个线程占有。
读锁:也叫共享锁,该锁可以被多个线程占有。
ReadWriteLock
,即读写锁,正如它的名字一样,它包含了读锁和写锁,一个用于只读操作,一个用于写入操作,我们先来看看JDK文档中对它的说明。
创建一个读写锁对象:
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
加读锁和解读锁:
readWriteLock.readLock().lock();
readWriteLock.readLock().unlock();
加写锁和解写锁:
readWriteLock.writeLock().lock();
readWriteLock.writeLock().unlock();
数据读写时可以使用读写锁来保证线程安全,示例代码如下:
package com.wunian.juc.rwlock;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 独占锁(写锁)一次只能被一个线程占有
* 共享锁(读锁)一个锁可以被多个线程占有
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCacheLock myCache=new MyCacheLock();
//模拟线程
//写
for (int i=1;i<=5;i++){
final int tempInt=i;
new Thread(()->{
myCache.put(tempInt+"",tempInt+"");
},String.valueOf(i)).start();
}
//读
for (int i=1;i<=5;i++){
final int tempInt=i;
new Thread(()->{
myCache.get(tempInt+"");
},String.valueOf(i)).start();
}
}
}
//加锁后的读写操作
class MyCacheLock{
private volatile Map<String,Object> map=new HashMap<>();
//读写锁
private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
//读:可以被多个线程同时读
public void get(String key){
//锁一定要匹配,否则可能导致死锁
readWriteLock.readLock().lock();//读锁,被多个线程同时持有
try {
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object o=map.get(key);
System.out.println(Thread.currentThread().getName()+"读取结果"+o);
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();//解锁
}
}
//写:应该保证原子性,不应该被打扰,写线程写入的过程中如果不加锁,会被读线程打扰
public void put(String key,Object value){
readWriteLock.writeLock().lock();//写锁,只能被一个线程占有
try {
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();//解锁
}
}
}
阻塞队列
队列:是一种先进先出的数据结构。
栈:是一种后进先出的数据结构。
阻塞队列是一种队列,我们先来看看JDK文档中对它的说明。
阻塞队列在什么情况下一定会被阻塞?
- 当队列是满的,如果还要往它里面添加元素就会被阻塞。
- 当队列是空的,如果还要取它里面的元素就会被阻塞。
什么时候使用阻塞队列?
当编写多线程程序时,对于线程之间的通信,不需要关心唤醒的情况下可以使用阻塞队列。
阻塞队列是新知识吗?
List、Set这些集合类我们都学过,阻塞队列和它们是一样的,我们可以来看一张集合类的关系图。
由上图可知,BlockingQueue是Queue的子类,而Queue与List、Set一样都是Collection类的子类。
ArrayBlockingQueue
是BlockingQueue的子类,它含有四组对元素的插入和获取的API,我们可以用表格来对比一下。
方法 | 会抛出异常 | 返回布尔值,不会抛出异常 | 延时等待 | 一直等待 |
---|---|---|---|---|
插入 | add() | offer(e) | offer(e,time) | put() |
取出 | remove() | poll() | poll(time) | take() |
检查 | element() | peek() | - | - |
四组API的示例代码如下:
package com.wunian.juc.queue;
import com.sun.scenario.effect.impl.sw.java.JSWBlend_SRC_OUTPeer;
import java.sql.Time;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 阻塞队列
*/
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue blockingQueue=new ArrayBlockingQueue(3);
//队列已满出现的四种情况:报错、抛弃不报错、一直等待、超时等待!
//java.lang.IllegalStateException: Queue full
blockingQueue.add("a");
blockingQueue.add("b");
blockingQueue.add("c");
//blockingQueue.add("d");//会抛出队列已满的异常
System.out.println(blockingQueue.element());//检测第一个元素,输出
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
//返回布尔值,不抛出异常
// System.out.println(blockingQueue.offer("a"));//true
// System.out.println(blockingQueue.offer("b"));//true
// System.out.println(blockingQueue.offer("c"));//true
// //尝试等待三秒,三秒钟后会失败,返回false
// //System.out.println(blockingQueue.offer("d", 3L,TimeUnit.SECONDS));
// System.out.println(blockingQueue.peek());//检测第一个元素,输出
//
// System.out.println("================================");
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// //虽然是空的,还是会等待3秒,然后返回null
// System.out.println(blockingQueue.poll(3L, TimeUnit.SECONDS));
// blockingQueue.put("a");
// blockingQueue.put("b");
// blockingQueue.put("c");
// //System.out.println("准备放入第四个元素");
// //blockingQueue.put("d");//队列满了会一直等,并且会阻塞
//
// System.out.println("===============================");
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());//队列空了会一直等,并且阻塞
}
}
同步队列
SynchronousQueue
,即同步队列,是一种特殊的阻塞队列,因为它只有一个容量,并且每进行一个put操作,就需要有一个take操作。
示例代码如下:
package com.wunian.juc.queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
* 同步队列
* 只能存放一个值,一存一取(存一个必须取一个才能继续存)
*/
public class SynchronuseQueueDemo {
public static void main(String[] args) {
//特殊的阻塞队列
BlockingQueue<String> blockingQueue=new SynchronousQueue<>();
//存
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" put a");
blockingQueue.put("a");
System.out.println(Thread.currentThread().getName()+" put b");
blockingQueue.put("b");
System.out.println(Thread.currentThread().getName()+" put c");
blockingQueue.put("c");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
//取
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" "+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" "+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" "+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
}
}
线程池
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。
为什么要使用线程池?
实现线程复用,线程复用可以提高线程的使用效率,保证内核的充分利用,防止过分调度。
线程池的三大方法
创建只有一个线程的线程池:
ExecutorService pool= Executors.newSingleThreadExecutor();
创建固定线程数的线程池:
ExecutorService pool=Executors.newFixedThreadPool(3);
创建可伸缩的线程池:
ExecutorService pool=Executors.newCachedThreadPool();
线程池的七大参数
我们先看看上面三大方法的源码:
//可伸缩的线程池
new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 约等于21亿
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
//固定线程数的线程池
new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
//单线程的线程池
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())
由以上代码可以知道,三大方法的底层还是创建的ThreadPoolExecutor
对象,这也就是为什么阿里巴巴开发手册要求不能使用三大方法,而必须直接通过创建ThreadPoolExecutor
对象来创建线程池的原因。
我们再来看看定义了七大参数的构造器源码:
public ThreadPoolExecutor(int corePoolSize, // 核心池线程数大小 (常用)
int maximumPoolSize, // 最大的线程数大小 (常用)
// 超时等待时间,超过一定时间会把核心线程数以外的闲置线程关闭(常用)
long keepAliveTime,
TimeUnit unit, // 时间单位 (常用)
BlockingQueue<Runnable> workQueue, // 阻塞队列(常用)
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略(常用)) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
由以上代码可知,七大参数为:
-
int corePoolSize
:核心池线程数,即线程池固定开启的线程数量。 -
int maximumPoolSize
:最大线程数,当核心池线程全部都在处理任务,阻塞队列中的任务也已经满了,一旦增加了新任务,线程池会开启一个新的线程来处理,开启的线程数量最多为最大线程数,超过这个数量,将执行拒绝策略。 -
long keepAliveTime
:超时等待时间,超过这个时间会将闲置的线程关闭,最后只保留核心池线程数量的线程。 -
TimeUnit unit
:keepAliveTime参数的时间单位。 -
BlockingQueue< Runnable > workQueue
:阻塞队列,用来存放等待线程执行的任务。 -
ThreadFactory threadFactory
:线程工厂,默认值:Executors.defaultThreadFactory()
-
RejectedExecutionHandler handler
:拒绝策略,当线程池中执行的线程数量达到最大线程数,新增的任务将根据拒绝策略进行处理。
除了ThreadFactory
这个参数使用的是系统默认的线程工厂外,其它六个参数都是必须掌握的。
四大拒绝策略
七大参数中的RejectedExecutionHandler
参数,指的就是拒绝策略,源码中为我们提供了四大拒绝策略。
-
ThreadPoolExecutor.AbortPolicy()
: 抛出异常,丢弃任务。 -
ThreadPoolExecutor.DiscardPolicy()
:不抛出异常,丢弃任务。 -
ThreadPoolExecutor.DiscardOldestPolicy()
: 尝试获取任务,不一定执行。 -
ThreadPoolExecutor.CallerRunsPolicy()
:哪来的去哪里找对应的线程执行。
最大线程数应该如何设置?
- CPU密集型:根据CPU的处理器数量来决定,这样能够保证最大效率。
获取电脑CPU核数:Runtime.getRuntime().availableProcessors();
- IO密集型:例如有50个线程都是进程操作大IO资源,比较耗时,这时就要考虑最大线程数一定要大于这个常用IO的任务数,即最大线程数要大于50。
最终代码如下:
package com.wunian.juc.threadpool;
import java.util.concurrent.*;
/**
* 线程池
*/
public class ThreadPoolDemo {
public static void main(String[] args) {
ExecutorService pool=new ThreadPoolExecutor(
2,//核心线程数大小(常用)
//最大线程数大小(常用)
Runtime.getRuntime().availableProcessors(),//获取电脑CPU核数
//超时等待时间(常用,超过一定时间会把核心线程数以外的闲置线程关闭)
3L,
//时间单位(常用)
TimeUnit.SECONDS,
//阻塞队列(常用)
new LinkedBlockingDeque<>(3),
//线程工厂
Executors.defaultThreadFactory(),
//拒绝策略(常用,4种)
new ThreadPoolExecutor.CallerRunsPolicy()
);
try {
//线程池的使用方式
for(int i=1;i<=100;i++){
pool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//使用完毕后需要关闭!
pool.shutdown();
}
}
}
四个函数式接口
函数式接口在java.util.function
包下面,所有的函数式接口都可以用来简化编程模型,都可以使用lambda表达式简化。
必须掌握的四个函数式接口
-
Function
:有一个输入参数,有一个输出参数。 -
Consumer
:有一个输入参数,没有输出参数。 -
Supplier
:没有输入参数,只有输出参数。 -
Predicate
:有一个输入参数,判断是否正确
lambda表达式语法格式可以简单概括为:(参数)->{方法体}
,示例代码如下:
package com.wunian.juc.function;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
/**
* 函数式接口是现在必须掌握且精通的
*所有的函数式接口都可以使用lambda表达式简化
* lambda表达式是java8必须掌握的
*
*/
public class FunctionInterfaceDemo {
public static void main(String[] args) {
/*Function<String,Integer> function=new Function<String,Integer>(){
@Override
public Integer apply(String s) {
return s.length();
}
};*/
//Function lambda表达式格式 (参数)->{方法体}
Function<String,Integer> function=(str)->{return str.length();};
System.out.println(function.apply("1234565"));
/*Predicate<String> predicate=new Predicate<String>() {
@Override
public boolean test(String s) {
return s.isEmpty();
}
};*/
//Predicate lambda表达式格式 (参数)->{方法体}
Predicate<String> predicate=str->{return str.isEmpty();};
System.out.println(predicate.test("qqq"));
/*Supplier<String> supplier=new Supplier<String>() {
@Override
public String get() {
return "hello word";
}
};*/
//Supplier lambda表达式格式 (参数)->{方法体}
Supplier<String> supplier=()->{return "hello juc";};
System.out.println(supplier.get());
/*Consumer<String> consumer=new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};*/
//Consumer lambda表达式格式 (参数)->{方法体}
Consumer<String> consumer=(s)->{
System.out.println(s);
};
consumer.accept("hello consumer");
}
}
Stream流式计算
数据库和集合都是用来存数据的,我们可以把计算和处理数据交给Stream。先来看看JDK文档中对它的说明。
Stream
比如现在有一个集合,需要按条件用户筛选以下条件:
1、id 为偶数
2、年龄大于24
3、用户名大写 映射
4、用户名倒排序
5、输出一个用户
并且只能用一行代码完成!
使用Stream流式计算,这个问题很好处理,代码如下:
package com.wunian.juc.stream;
import java.util.Arrays;
import java.util.List;
/**
* 流式计算
* 数据库、集合:存数据的
*计算和处理数据交给Stream
*/
public class StreamDemo {
public static void main(String[] args) {
User u1=new User(1,"a",23);
User u2=new User(2,"b",20);
User u3=new User(3,"c",26);
User u4=new User(4,"d",30);
User u5 =new User(5,"e",21);
//存储
List<User> users= Arrays.asList(u1,u2,u3,u4,u5);
//计算等操作交给流
//forEach(消费者类型接口)
users.stream()
.filter(u->{return u.getId()%2==0;})
.filter(u->{return u.getAge()>24;})
.map(u->{return u.getName().toUpperCase();})
.sorted((o1,o2)->{return o2.compareTo(o1);})
.limit(1)
.forEach(System.out::println);
}
}
ForkJoin分支合并
ForkJoin采用了分治算法思想,在必要的情况下,将一个大任务,进行拆分(fork) 成若干个子任务(拆到不能再拆,这里就是指我们制定的拆分的临界值),再将一个个小任务的结果进行join汇总。
MapReduce:input->split->map->reduce->output
主要就是两步:
1、任务拆分
2、结果合并
前提
使用ForkJoin的前提是在大数据量的情况下,如果数据量很小,ForkJoin的效率还不如不用来的快。
ForkJoin的工作原理
假如两个CPU上有不同的任务,这时候B已经执行完,A还有任务等待执行,这时候B就会将A队尾的任务偷过来,加入自己的队列中,这叫做工作窃取,ForkJoin的底层维护的是一个双端队列
好处:处理效率高。
坏处:可能产生资源争夺。
ForkJoin的工作原理
我们可以使用求和计算测试一下ForkJoin,这就需要用到
RecursiveTask
类了,先来看看它的JDK文档。RecursiveTask
先来创建一个类继承
RecursiveTask
,重写其compute
方法,代码如下:
package com.wunian.juc.forkjoin;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* 求和计算
*/
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
private static final Long tempLong=10000L;//临界值,只要超过了这个值,ForkJoin效率就会更高
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
//计算方法
@Override
protected Long compute() {
if((end-start)<=tempLong){
Long sum=0L;
for(Long i=start;i<=end;i++){
sum+=i;
}
return sum;
}else{//超过临界值,用第二种方式
long middle=(end+start)/2;
//ForkJoin实际上是通过递归来实现
ForkJoinDemo right=new ForkJoinDemo(start,middle);
right.fork();//压入线程队列
ForkJoinDemo left=new ForkJoinDemo(middle+1,end);
left.fork();//压入线程队列
//获得结果join,会阻塞等待结果
return right.join()+left.join();
}
}
}
创建一个测试类,分别测试普通方法、ForkJoin方法、并行流计算方法的计算效率,代码如下:
package com.wunian.juc.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
/**
* 测试forkjoin
*/
public class MyTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//test1();//6345
//test2();//13476
test3();//902
}
//普通方法
public static void test1() {
long sum=0L;
long start=System.currentTimeMillis();
for(Long i=0L;i<=10_0000_0000L;i++){
sum+=i;
}
long end=System.currentTimeMillis();
System.out.println("times:"+(end-start)+" rs=>:"+sum);
}
//forkjoin(计算大数据量时才有效果,数据较小时可能还不如普通方法快)
public static void test2() throws ExecutionException, InterruptedException {
long start=System.currentTimeMillis();
ForkJoinPool forkJoinPool=new ForkJoinPool();
ForkJoinDemo forkJoinWork=new ForkJoinDemo(0L,10_0000_0000L);
ForkJoinTask<Long> submit=forkJoinPool.submit(forkJoinWork);
Long sum=submit.get();
long end=System.currentTimeMillis();
System.out.println("times:"+(end-start)+" rs=>:"+sum);
}
//并行流计算
public static void test3() {
long sum=0L;
long start=System.currentTimeMillis();
sum=LongStream.rangeClosed(0L,10_0000_0000L).parallel().reduce(0,Long::sum);//并行计算
long end=System.currentTimeMillis();
System.out.println("times:"+(end-start)+" rs=>:"+sum);
}
}
最后的测试结果表明,在计算数据量较大时,并行流计算方法是最快的,ForkJoin方法次之,普通方法最慢。在计算数据量很小时,ForkJoin方法和并行流计算方法反而比普通方法慢,再次验证了使用ForkJoin的前提是在大数据量的情况下的这个结论。
异步回调
以往我们常常会使用callable来进行异步调用,但是callable没有返回值。与之相比,Future可以有返回值,也可以没有返回值。我们可以使用Future的子类completableFuture
来测试一下,代码如下:
package com.wunian.juc.future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* 异步回调 callable没有返回值,使用Future
*
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//没有返回值
/* CompletableFuture<Void> completableFuture=CompletableFuture.runAsync(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"没有返回值!");
});
System.out.println("111111");
completableFuture.get();*/
//有返回值
CompletableFuture<Integer> completableFuture=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+"=>supply Async!");
int i=10/0;
return 1024;
});
System.out.println(completableFuture.whenComplete((t,u)->{
System.out.println("t=>"+t);//正确结果
System.out.println("u=>"+u);//错误信息
}).exceptionally(e->{//失败,如果错误就返回错误的结果
System.out.println("e:"+e.getMessage());
return 500;
}).get());
}
}
由以上代码我们可以知道,CompletableFuture不但有返回值,并且当异步调用过程中如果出现异常,连错误信息也会返回,这样就可以很方便的做一些异常处理,显然这点要比callable强大很多。
网友评论