java 高并发
基本概念
- 并发: 多个线程操作相同的资源,保证线程安全,合理使用资源
- 高并发: 服务能同时处理很多请求,提升程序性能
并发编程的基础
cpu多级缓存
cpu频率太快了,快到主存跟不上这样在处理器时钟周期内,cpu常常需要等待主存,浪费资源。
所以cache的出现,是为了缓解cpu和内存之间的速度不匹配的问题。
cpu多级缓存的意义
- 时间局部性: 如果某个数据被访问,那么在不久的将来,该数据很可能再次被访问。
- 空间局部性: 如果某个数据被访问,那么与它相邻的数据很快也可能也被访问。
缓存一致性(MESI)
用于保证多个cpu cache之间的缓存数据的一致性
- M: modified 被修改
- 代表该缓存行只被缓存在cpu的缓存中,并且是被修改过的,因此其与主存数据不一致,需要被写回主存。
- 当写回主存之后,状态变为E
- E: exclusive 独享
- 只被缓存在某cpu中,与主存数据一致,该状态的缓存可在任何时候被其他缓存访问时变为共享状态S,当被修改时变为M状态
- S: shared 共享
- 该状态标识该缓存行可能被多个cpu进行缓存,且各缓存中数据与主存是一致的,当有一个cpu修改该缓存的时候,其他cpu的该缓存可以被作废,变为I状态
- I: invalid 无效
- 代表该缓存行无效
乱序执行优化
- 处理其为提高运算速度而做出违背代码原有顺序的优化
juc 之 并发容器
- ArrayList -> CopyOnWriteArrayList
- HashSet, TreeSet -> CopyOnWriteArraySet, ConcurrentSkipListSet
- HashMap, TreeMap -> ConcurrentHashMap, ConcurrentSkipListMap
ConcurrentHashMap vs ConcurrentSkipListMap
ConcurrentSkipListMap key是有序的,支持更高的并发
juc 之 aqs
aqs介绍
AbstractQueuedSynchronizer - AQS
- 使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架
- 利用了一个int类型表示状态
- 使用方法是继承
- 子类通过继承并通过实现它的方法管理其状态(acquire,release)的方法操纵状态
- 可以同时实现排他锁和共享锁模式(独占、共享)
aqs 同步组件
- CountDownLatch
- Semaphore
- CyclicBarrier
- ReentrantLock
- Condition
- FutureTask
CountDownLatch
@Slf4j
public class Demo001CountDownLatch {
private final static int count = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; i++) {
final int threadNum = i;
exec.execute(()->{
try {
test(threadNum);
} catch (InterruptedException e) {
log.error("exception",e);
} finally {
latch.countDown();
}
});
}
latch.await();
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}
Semaphore
@Slf4j
public class Demo003Semaphore {
private final static int count = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < count; i++) {
final int threadNum = i;
exec.execute(()->{
try {
semaphore.acquire(); // 获取执行许可
test(threadNum);
semaphore.release(); // 释放许可
} catch (InterruptedException e) {
log.error("exception",e);
} finally {
}
});
}
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
CyclicBarrier
CyclicBarrier vs CountDownLatch
- CountDownLatch 的计数器只能使用一次,
而CyclicBarrier的计数器可以重置
code:
@Slf4j
public class Demo004CyclicBarrier2 {
// CyclicBarrier 可以给一个runnable,满足条件时优先执行这个runnable
private static CyclicBarrier barrier = new CyclicBarrier(5,()->{
log.info("call back is running");
});
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
service.execute(()->{
try {
race(threadNum);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
});
}
service.shutdown();
}
private static void race(int threadNum) throws InterruptedException, BrokenBarrierException, TimeoutException {
Thread.sleep(1000);
log.info("{} is ready",threadNum);
try {
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (BrokenBarrierException e) {
log.warn("BrokenBarrierException");
} catch (TimeoutException e) {
log.warn("TimeoutException");
}
log.info("{} continue",threadNum);
}
}
ReentrantLock 与 锁
ReentrantLock与synchronized的区别
- 可重入性
- 两者都可重入
- 锁的实现
- synchronized 依赖于jvm实现
- ReentrantLock 通过jdk实现
- 性能区别
- 在功能 synchronized 可实现的情况下官方推荐 synchronized
- ReentrantLock 需要手动释放锁
- ReentrantLock 独有的功能
- 可指定公平锁还是非公平锁
- 提供了一个Condition类,可分组唤醒需要唤醒的线程
- 提供能够中断等待锁的线程的机制,lock.lockInterruptibly()
简单使用
@Slf4j
public class Demo005Lock {
public static int clientTotal = 5000;
public static int threadTotal = 500;
public static int count = 0;
private final static Lock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(clientTotal);
Semaphore semaphore = new Semaphore(threadTotal);
for (int i = 0; i < clientTotal; i++) {
service.execute(()->{
try {
semaphore.acquire();
add();
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
});
}
latch.await();
service.shutdown();
log.info("finish,count:{}",count);
}
private static void add() {
try {
lock.lock();
count++;
} finally {
lock.unlock();
}
}
}
ReentrantReadWriteLock 读写锁
@Slf4j
public class Demo006ReentrantReadWriteLock {
private final Map<String, Data> map = new HashMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public Set<String> getAllKeys() {
readLock.lock();
try {
return map.keySet();
} finally {
readLock.unlock();
}
}
public Data put(String key, Data value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
writeLock.unlock();
}
}
class Data{
}
}
StampedLock
@Slf4j
public class Demo007StampedLock {
public static int clientTotal = 5000;
public static int threadTotal = 500;
public static int count = 0;
private final static StampedLock lock = new StampedLock();
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(clientTotal);
Semaphore semaphore = new Semaphore(threadTotal);
for (int i = 0; i < clientTotal; i++) {
service.execute(()->{
try {
semaphore.acquire();
add();
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
});
}
latch.await();
service.shutdown();
log.info("finish,count:{}",count);
}
private static void add() {
long stamp = lock.writeLock();
try {
count++;
} finally {
lock.unlock(stamp);
}
}
}
Condition
@Slf4j
public class Demo008Condition {
public static void main(String[] args) throws IOException {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(()->{
try{
lock.lock();
log.info("wait signal"); // 1
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("get signal"); // 4
lock.unlock();
}).start();
new Thread(()->{
lock.lock();
log.info("get lock"); // 2
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signal();
log.info("send signal"); // 3
lock.unlock();
}).start();
System.in.read();
}
}
FutureTask
@Slf4j
public class Demo002FutureTask {
public static void main(String[] args) throws InterruptedException, ExecutionException {
FutureTask<String> futureTask = new FutureTask<>(() -> {
log.info("do something in callable");
Thread.sleep(5000);
return "done";
});
new Thread(futureTask).start();
log.info("do something in main");
Thread.sleep(2000);
String result = futureTask.get();
log.info("result :{}",result);
}
}
fork/join
fork/join 任务局限性
- 只能使用fork/join操作来作为同步机制
- 任务不做io操作
- 任务不能抛出异常
简单使用
@Slf4j
public class Demo001ForkJoin extends RecursiveTask<Integer> {
public static final int threshold = 2;
private int start;
private int end;
public Demo001ForkJoin(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
int middle = (start + end) / 2;
Demo001ForkJoin leftTask = new Demo001ForkJoin(start, middle);
Demo001ForkJoin rightTask = new Demo001ForkJoin(middle + 1, end);
leftTask.fork();
rightTask.fork();
int leftResult = leftTask.join();
int rightResult = rightTask.join();
sum = leftResult + rightResult;
}
return sum ;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
Demo001ForkJoin task = new Demo001ForkJoin(1, 100);
Future<Integer> result = forkJoinPool.submit(task);
try {
log.info("result:{}",result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
BlockingQueue
操作
- | Throws Exception | Special Value | Blocks | Times Out |
---|---|---|---|---|
insert | add(o) | offer(o) | put(o) | offer(o,timeout,timeunit) |
remove | remove(o) | poll(o) | take(o) | poll(timeout,timeunit) |
Examine | element(o) | peek(o) |
一些实现类
- ArrayBlockingQueue
- 有界的阻塞队列
- 先进先出
- DelayQueue
- 阻塞内部元素,元素必须实现Delayed接口,delayed 接口继承了comparable接口
- 内部元素需要进行排序,一般按照元素过期时间优先级进行排序
- LinkedBlockingQueue
- 大小可变,可指定边界
- 先进先出
- PriorityBlockingQueue
- 有优先级的阻塞队列
- 无边界
- 插入元素需要实现comparable接口,用于优先级的排序
- SynchronousQueue
- 内部仅容纳一个元素
- 内部有元素时其他放入元素的操作被阻塞
线程池
ThreadPoolExecutor
参数:
- corePoolSize : 核心线程数量
- maximumPoolSize : 最大线程数量
- workQueue : 阻塞队列,存储等待执行的任务,很重要,会对线程池运行过程产生重大影响
- keepAliveTime : 线程没有任务执行时最多保持多久中止
- unit : keepAliveTime 的时间单位
- threadFactory: 线程工厂
- rejectHandle: 当拒绝处理任务时的策略
操作:
- execute() : 提交任务
- submit() : 提交任务,能够返回结果
- shutdown() : 关闭线程池,等待任务都执行完
- shutdownNow() : 直接关闭线程池,不等待任务执行完
监控方法:
- getTaskCount() : 线程池已执行和未执行的任务总数
- getCompletedTaskCount() : 已完成的任务数量
- getPoolSize() : 线程池当前的线程数量
- getActiveCount() : 当前线程池中正在执行的线程数量
线程池-Executor框架接口
- Executors.newCachedThreadPool 可缓存的线程池
- Executors.newFixedThreadPool 固定数量的线程池,超过数量则等待
- Executors.newScheduledThreadPool 定长,支持定时周期性的执行任务
- Executors.newSingleThreadExecutor 单线程池化的线程池,只会用当个线程执行任务
newCachedThreadPool
@Slf4j
public class Demo001CachedThreadPool {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
int count = 50;
for (int i = 0; i < count; i++) {
final int num = i;
service.execute(()->{
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("task:{}",num);
});
}
service.shutdown();
}
}
newFixedThreadPool
@Slf4j
public class Demo002FixedThreadPool {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(10);
int count = 50;
for (int i = 0; i < count; i++) {
final int num = i;
service.execute(()->{
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("task:{}",num);
});
}
service.shutdown();
}
}
newSingleThreadExecutor
@Slf4j
public class Demo003SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService service = Executors.newSingleThreadExecutor();
int count = 50;
for (int i = 0; i < count; i++) {
final int num = i;
service.execute(()->{
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("task:{}",num);
});
}
service.shutdown();
}
}
newScheduledThreadPool
@Slf4j
public class Demo004ScheduledThreadPool {
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
// 单次调用,延时3秒后执行
service.schedule(()->{
log.info("delayed for 3");
},3, TimeUnit.SECONDS);
// 循环调用,延迟5秒后每3秒运行一次
service.scheduleAtFixedRate(()->{
log.info("scheduleAtFixedRate");
},5,3,TimeUnit.SECONDS);
// service.shutdown();
// 功能类似于Timer
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
log.info("Timer");
}
},new Date(),1000*5);
}
}
线程池的合理配置
- cpu密集型任务,就需要尽量压榨cpu,参考值为cpu数量+1
- io密集型,参考值可设置为2*cpu数量
死锁
死锁条件
- 互斥条件
- 请求和保持条件
- 不剥夺条件
- 环路等待条件
多线程并发最佳实践
- 使用本地变量
- 使用不可变类
- 最小化锁的作用域范围:
阿姆达尔定律: S = 1/(1-a+a/n) - 使用线程池的executor,而不是直接new thread执行
- 宁可使用同步也不要使用线程的wait和notify
- 使用BlockingQueue实现生产-消费模式
- 使用并发集合而不是加了锁的同步集合
- 使用semaphore创建有界的访问
- 宁可使用同步代码块也不使用同步的方法
- 避免使用静态变量
网友评论