使用线程安全的类
StringBuilder -> StringBuffer
SimpleDateFormat -> JodaTime
ArrayList -> Vector, Stack, CopyOnWriteArrayList
HashSet -> Collections.synchronizedSet(new HashSet()), CopyOnWriteArraySet
TreeSet -> Collections.synchronizedSortedSet(new TreeSet()), ConcurrentSkipListSet
HashMap -> HashTable, ConcurrentHashMap, Collections.synchronizedMap(new HashMap())
TreeMap -> ConcurrentSkipListMap, Collections.synchronizedSortedMap(new TreeMap())
JUC中有非常多的类,将部分类按功能进行分类,分别是:
原子atomic包
比synchronized功能更强大的lock包
线程调度管理工具
线程安全与并发工具集合
线程池
AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架,它是JUC并发包中的核心基础组件。JUC大大提高了Java的并发能力,AQS是JUC的核心。
![](https://img.haomeiwen.com/i2449169/eb4324de3e47ab33.png)
- 同步队列:AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。
- 继承实现:AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法acquire/release来管理同步状态。
- 同步状态维护:AQS使用一个int类型的成员变量state来表示同步状态,当state > 0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法getState()、setState(int newState)、compareAndSetState(int expect,int update)来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。
CountDownLatch
计数器闭锁是一个能阻塞主线程,让其他线程满足特定条件下主线程再继续执行的线程同步工具。
public class CountDownLatchTest {
private static final int COUNT = 1000;
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
CountDownLatch countDownLatch = new CountDownLatch(COUNT);
for (int i = 0; i < COUNT; i++) { //countDown方法的执行次数一定要与countDownLatch的计数器数量一致,否则无法将计数器清空导致主线程无法继续执行
int finalI = i;
executorService.execute(() -> {
try {
Thread.sleep(3000);
System.out.println(finalI);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await(1, TimeUnit.SECONDS); //主线程只等1秒,超过之后继续执行主线程
executorService.shutdown(); //当正在执行的线程执行完成之后再关闭而不是立即停止线程
System.out.println("done!");
}
}
这段程序先设置CountDownLatch为100,然后在其他线程中调用100次countDown方法,随后主程序在等待100次被执行完成之后,继续执行主线程代码
Semaphore
信号量是一个能阻塞线程且能控制统一时间请求的并发量的工具。比如能保证同时执行的线程最多200个,模拟出稳定的并发量。
public class CountDownLatchTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
Semaphore semaphore = new Semaphore(3); //配置只能发布3个运行许可证
for (int i = 0; i < 100; i++) {
int finalI = i;
executorService.execute(() -> {
try {
semaphore.acquire(3); //获取3个运行许可,如果获取不到会一直等待,使用tryAcquire则不会等待
Thread.sleep(1000);
System.out.println(finalI);
semaphore.release(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
由于同时获取3个许可,所以有即使开启了100个线程,但是每秒只能执行一个任务
原理
new Semaphore(3)传入的3就是AQS中state的值,也是许可数的总数,在调用acquire时,检测此时许可数如果小于0,就将被阻塞,然后将线程构建Node进入AQS队列
使用场景
数据库连接并发数,如果超过并发数,等待(acqiure)或者抛出异常(tryAcquire)
CyclicBarrier
可以让一组线程相互等待,当每个线程都准备好之后,所有线程才继续执行的工具类
public class CyclicBarrierTest {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
System.out.println("ready done callback!");
});
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
int finalI = i;
Thread.sleep(1000);
executorService.execute(() -> {
try {
System.out.println(finalI + "ready!");
cyclicBarrier.await();
// cyclicBarrier.await(2000, TimeUnit.MILLISECONDS); // 如果某个线程等待超过2秒就报错
System.out.println(finalI + "go!");
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
原理
与CountDownLatch类似,都是通过计数器实现的,当某个线程调用await之后,计数器减1,当计数器大于0时将等待的线程包装成AQS的Node放入等待队列中,当计数器为0时将等待队列中的Node拿出来执行。
与CountDownLatch的区别:
CountDownLatch是一个线程等其他线程,CyclicBarrier是多个线程相互等待
CyclicBarrier的计数器能重复使用,调用多次
使用场景
1.CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。
2.有四个游戏玩家玩游戏,游戏有三个关卡,每个关卡必须要所有玩家都到达后才能允许通过。其实这个场景里的玩家中如果有玩家A先到了关卡1,他必须等到其他所有玩家都到达关卡1时才能通过,也就是说线程之间需要相互等待。
ReentrantLock
名为可重入锁,其实synchronized也可重入,是JDK层级上的一个并发控制工具
原理
// 以公平锁为例,从lock.lock()开始研究
final void lock() { acquire(1);}
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 首先通过公平或者非公平方式尝试获取锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 然后构建一个Node放入队列中并等待执行的时机
selfInterrupt();
}
// 公平锁设置锁执行状态的逻辑
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { //如果state是0,就是当前的锁没有人占有
if (!hasQueuedPredecessors() && // 公平锁的核心逻辑,判断队列是否有排在前面的线程在等待锁,非公平锁就没这个条件判断
compareAndSetState(0, acquires)) { // 如果队列没有前面的线程,使用CAS的方式修改state
setExclusiveOwnerThread(current); // 将线程记录为独占锁的线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 因为ReentrantLock是可重入的,线程可以不停地lock来增加state的值,对应地需要unlock来解锁,直到state为零
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 接下来要执行的acquireQueued如下
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 再次使用公平锁逻辑判断是否将Node作为头结点立即执行
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
ReentrantLock与synchronized的区别
1.用法。synchronized既可以很方便的加在方法上,也可以加载特定代码块上,而lock需要显示地指定起始位置和终止位置。
2.实现。synchronized是依赖于JVM实现的,而ReentrantLock是JDK实现的
3.性能。synchronized和lock其实已经相差无几,其底层实现已经差不多了。但是如果你是Android开发者,使用synchronized还是需要考虑其性能差距的。
4.功能。ReentrantLock功能更强大。
ReentrantLock可以指定是公平锁还是非公平锁,而synchronized只能是非公平锁,所谓的公平锁就是先等待的线程先获得锁。
ReentrantLock提供了一个Condition(条件)类,用来实现分组唤醒需要唤醒的线程们,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程
ReentrantLock提供了一种能够中断等待锁的线程的机制,通过lock.lockInterruptibly()来实现这个机制
我们控制线程同步的时候,优先考虑synchronized,如果有特殊需要,再进一步优化。ReentrantLock如果用的不好,不仅不能提高性能,还可能带来灾难。
Condition
条件对象的意义在于对于一个已经获取锁的线程,如果还需要等待其他条件才能继续执行的情况下,才会使用Condition条件对象。与ReentrantLock结合使用,类似wait与notify。
public class ConditionTest {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Thread thread1 = new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " run");
System.out.println(Thread.currentThread().getName() + " wait for condition");
try {
condition.await(); // 1.将线程1放入到Condition队列中等待被唤醒,且立即释放锁
System.out.println(Thread.currentThread().getName() + " continue"); // 3.线程2执行完毕释放锁,此时线程1已经在AQS等待队列中,则立即执行
} catch (InterruptedException e) {
System.err.println(Thread.currentThread().getName() + " interrupted");
Thread.currentThread().interrupt();
}
} finally {
lock.unlock();
}
});
Thread thread2 = new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " run");
System.out.println(Thread.currentThread().getName() + " sleep 1 secs");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.err.println(Thread.currentThread().getName() + " interrupted");
Thread.currentThread().interrupt();
}
condition.signalAll(); // 2.线程2获得锁,signalAll将Condition中的等待队列全部取出并加入到AQS中
} finally {
lock.unlock();
}
});
thread1.start();
thread2.start();
}
}
Future、FutureTask、CompletableFuture 、Callable与Runnable
public class FutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = executorService.submit(() -> {
try {
System.out.println("doing");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "done";
});
System.out.println(future.get());
}
}
public class FutureTaskTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.println("doing");
Thread.sleep(1000);
return "down";
});
executorService.submit(futureTask);
// new Thread(futureTask).start();
System.out.println(futureTask.get());
executorService.shutdown();
}
}
public class CompletableFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> string1Future = CompletableFuture.supplyAsync(() -> {
System.out.println("doing string1");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("done string1");
return "string1";
});
CompletableFuture<String> string2Future = CompletableFuture.supplyAsync(() -> {
System.out.println("doing string2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("done string2");
return "string2";
});
CompletableFuture.allOf(string1Future, string2Future).join();
System.out.println(string1Future.get() + "and" + string2Future.get());
}
}
BlockingQueue
https://wsmajunfeng.iteye.com/blog/1629354
Fork Join框架
fork join框架是JDK7中出现的一款高效的工具,Java开发人员可以通过它充分利用现代服务器上的多处理器。它是专门为了那些可以递归划分成许多子模块设计的,目的是将所有可用的处理能力用来提升程序的性能。fork join框架一个巨大的优势是它使用了工作窃取算法,可以完成更多任务的工作线程可以从其它线程中窃取任务来执行
但这样会要额外地对任务分派线程进行管理,无形地会增加管理的难度和复杂度,还可能碰到资源竞争导致的同步操作与性能损耗
网友评论