主要三部分:
- 同步控制工具
- 线程池
- 并发容器
1. 同步控制
1.1 重入锁(sychronized,wait和notify的替代品,增强版)
- 与sychronized相比,重入锁ReentrantLock有着显示的操作过程,程序员必须手动指定加锁解锁,因此对逻辑控制的灵活性要好于sychronized。
- ReentrantLock可以多次加锁,但是必须相同次数解锁。
- ReentrantLock可以被中断。两个线程互相申请对方持有的锁,那么两个线程发生死锁一直得不到运行,这时可以中断一个线程。
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;
public IntLock(int lock) {
this.lock = lock;
}
@Override
public void run() {
try {
if(lock == 1) {
lock1.lockInterruptibly();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock2.lockInterruptibly();
}else {
lock2.lockInterruptibly();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock1.lockInterruptibly();
}
}catch (InterruptedException e) {
e.printStackTrace();
}finally {
if(lock1.isHeldByCurrentThread()) {
lock1.unlock();
}
if(lock2.isHeldByCurrentThread()) {
lock2.unlock();
}
System.out.println("线程退出"+Thread.currentThread().getId());
}
}
public static void main(String[] args) {
IntLock r1 = new IntLock(1);
IntLock r2 = new IntLock(2);
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();t2.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
t2.interrupt();
}
- 除了等待外部通知(中断),要避免死锁,还可以用tryLock()限时等待。tryLock()可以带参数,如果不带参数,代表没有等待时间,申请不到就立刻返回可以有效避免死锁。
public class TimeLock implements Runnable{
static ReentrantLock lock = new ReentrantLock();
@Override
public void run() {
try {
if(lock.tryLock(5, TimeUnit.SECONDS)) {
Thread.sleep(6000);
}else {
System.out.println("申请锁失败,超时");
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
if(lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
public static void main(String[] args) {
Thread t1 = new Thread(new TimeLock());
Thread t2 = new Thread(new TimeLock());
t1.start();t2.start();
}
}
- ReentrantLock锁可以设置为公平锁,而sychronized是非公平的
public ReentrantLock(boolean fair)
ReentrantLock的几个重要方法:
- lock()
- lockInterruptibly()
- tryLock()
- tryLock(long time, TimeUnit unit)
- unlock()
1.2 重入锁的好搭档:Condition
- 与wait,notify相似,这两个与sychronized相配合,Condition与ReentrantLock配合使用。
- 在进行等待唤醒操作时,需要获得锁,被唤醒后也需要先获得锁才能就绪运行。
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
void await()//使当前线程等待,同时释放锁,可以响应中断
void awaitUninterruptibly()//不能被中断
long awaitNanos(long nanosTimeout)
boolean await(long time, TimeUnit unit)
boolean awaitUntil(Date deadline)
void signal()//唤醒一个等待的线程
void signalAll()//唤醒所有线程
1.3 允许多个线程同时访问:信号量(Semaphore)
- sychronized和ReentrantLock都只允许同时只有一个线程访问一个资源。
- Semaphore允许指定多个线程访问某个资源。
//构造函数
public Semaphore(int permits)
public Semaphore(int permits, boolean fair)//公平
//方法
public void acquire()//尝试获得准入许可,若无法获得,则等待,可以被中断
public void acquireUninterruptibly()//无法被中断
public boolean tryAcquire()//尝试获得准入许可,不等待
public boolean tryAcquire(long timeout, TimeUnit unit)
public void release()//释放许可
1.4 ReadWriteLock读写锁
- 系统中读操作远大于写操作,可以使用读写锁,提升性能。
- 读-读不互斥
- 读-写互斥
- 写-写互斥
static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
//调用时传入相应的锁
public int handleRead(Lock lock) {
try{
lock.lock();
//read something
return 0;
}finally {
lock.unlock();
}
}
public int handlerite(Lock lock) {
try{
lock.lock();
//write something
return 0;
}finally {
lock.unlock();
}
}
1.5 倒计时器:CountDownLatch
- 可以让一个线程等待直到倒计时结束,再开始执行。(相当于这些线程都调用了join())
//构造函数
public CountDownLatch(int count)
static CountDownLatch end = new CountDownLatch(10);
Runnable r = new Runnable() {
public void run() {
//do domething
end.countDown(); //通知计数器减一
}
};
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
//创建10个线程
}
end.await(); //主线程等待计数器归零
//继续执行
}
1.6 循环栅栏:CyclicBarrier
- CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
- CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。比如以下代码执行完之后会返回true。
1. 默认的构造方法是
CyclicBarrier(int parties)
其参数表示屏障拦截的线程数量,每个线程调用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
2. 带参构造方法
CyclicBarrier(int parties, Runnable barrierAction)
完成一次计数后,执行指定动作。
- CyclicBarrier.await()会抛出两种异常,InterruptedException和BrokenBarrierException。
- InterruptedException很好理解,大部分迫使线程等待的方法都可能抛出这个异常。
- BrokenBarrierException表示这个栅栏坏了,不能等到所有线程到达了。
1.7 线程阻塞工具类:LockSupport
- 可以在线程的任意位置让线程阻塞。
- 和suspend相比,弥补了resume发生在前导致线程无法继续的情况。
- 和object.wait()相比,不需要先获得锁,也不会抛出InterruptedException。
- 即使unpark先于park发生,也没关系。
static park():静态方法,阻当前线程
static parkNanos()
static parkUntil()
static unpark(Thread thread)
- LockSupport和每个使用它的线程都与一个许可(permit)关联。permit相当于1,0的开关,默认是0,调用一次unpark就加1变成1,调用一次park会消费permit, 也就是将1变成0,同时park立即返回。再次调用park会变成block(因为permit为0了,会阻塞在这里,直到permit变为1), 这时调用unpark会把permit置为1。每个线程都有一个相关的permit, permit最多只有一个,重复调用unpark也不会积累。
1.8 线程间交换数据:Exchanger
- Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据, 如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
public class ExchangerTest {
private static final Exchanger<String> exgr = new Exchanger<String>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String A = "银行流水A";// A录入银行流水数据
System.out.println("1号:" + exgr.exchange(A));
} catch (InterruptedException e) {
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String B = "银行流水B";// A录入银行流水数据
System.out.println("2号:" + exgr.exchange(B));
} catch (InterruptedException e) {
}
}
});
threadPool.shutdown();
}
}
//输出:
//1号:银行流水B
//2号:银行流水A
2. 线程复用:线程池
- 与进程相比,线程是轻量级的,但是创建和关闭依然需要花费时间,如果为每一个小任务都创建线程,很有可能出现创建销毁线程所占用的时间大于真实工作所消耗的时间。
- 线程本身占用内存空间,大量线程增加GC工作量。
- 所以,对线程的数量要加以控制。
2.1 什么是线程池?
线程池预先创建一些长期保持激活状态的线程对象,当需要使用线程时,从线程池中拿出一个线程对象直接使用,使用完毕后将该线程归还给线程池,从而免去线程对象创建和销毁的成本。
线程池
2.2 JDK线程池
JDK提供了一套Executor框架,本质就是一个线程池。
Executor框架结构图
- ThreadPoolExecutor表示一个线程池
- Executors类扮演一个线程池工厂的角色,通过Executors可以取得一个拥有特定功能的线程池。
【几种线程池】
- newFixedThreadPool:固定数量。当一个任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有空闲线程,便处理任务队列中的任务。
- newSingleThreadExecutor:数量为1。
- newCachedThreadPool:数量不确定。若有空闲线程,则优先使用空闲线程。若没有,则创建新的线程。任务完毕后,线程归还线程池。
- newSingleThreadScheduledExecutor:
- newScheduledThreadPool:与上面一个一样,返回一个ScheduledExecutorService对象,扩展了在给点时间执行某任务的功能。上面数量为1,下面数量可以指定。
//固定数量的线程池,同时只能处理5个任务,所以隔两秒打印5句。
public class ThreadPoolDemo {
public static void main(String[] args) {
Runnable r1 = ()->{
System.out.println(System.currentTimeMillis()+":Thread ID:" + Thread.currentThread().getId());
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
};
ExecutorService es = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
es.submit(r1);
}
}
}
newScheduledThreadPool这个方法可以根据时间需要对线程进程调度
schedule(Runnable command, long delay, TimeUnit unit)
scheduleAtFixedRate(Runnable command, long initDelay, long period, TimeUnit unit)
scheduleWithFixedRate(Runnable command, long initDelay, long period, TimeUnit unit)
- schedule:在给定时间,对任务进行一次调度。
- scheduleAtFixedRate:对上次任务开始时间的delay一段时间后调度下次任务(如果周期小于任务执行时间,则在任务结束后立刻执行,不会提前)
- scheduleWithFixedRate:对上次任务结束时间的delay一段时间后调度下次任务
-
如果某次运行出现异常,后续所有都会停止。
两者区别
public class ScheduledExecutorServiceDemo {
public static void main(String[] args) {
Runnable r1 = ()->{
try {
Thread.sleep(3000);
System.out.println(System.currentTimeMillis()/1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
ses.scheduleAtFixedRate(r1, 0, 2, TimeUnit.SECONDS);
}
}
线程池核心实现
- newFixedThreadPool 、newSingleThreadExecutor 、newCachedThreadPool都是使用了ThreadPoolExecutor实现
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
//核心ThreadPoolExecutor的一个构造函数
public ThreadPoolExecutor(int corePoolSize, //指定线程池线程数量
int maximumPoolSize, //最大线程数量
long keepAliveTime, //线程数量超过corePoolSize时,空闲线程存货时间
TimeUnit unit,
BlockingQueue<Runnable> workQueue, //任务队列,被提交但是尚未被执行的任务如何存放
ThreadFactory threadFactory, //线程过程,用来创建线程,一般默认
RejectedExecutionHandler handler) //拒绝策略,任务太多如何拒绝
【任务队列】
- 直接提交队列:SychronousQueue对象实现,这个队列没有容量。总是将新任务交给线程执行,如果没有空闲线程,则尝试创建,如线程数量达到最大,则执行拒绝策略。所以要配合很大的maximumPoolSize使用。
- 有界任务队列:ArrayBlockingQueue对象实现,构造时要加上队列最大容量。逻辑与下图一样。
- 无界任务队列:LinkedBlockingQueue对象实现,除非系统资源耗尽,否则入队总可以成功。
- 优先任务队列:PriorityBlockingQueue对象实现,队列中的任务不是按照先进先出提交,有一个优先级。
【为什么先让任务进入队列,而不是直接创建线程?】
- 因为创建新线程时需要先获得全局锁,可能造成严重的可伸缩瓶颈。
【拒绝策略】
- AbortPolicy:直接抛出异常,阻止系统正常执行。
- CallerRunsPolicy:只要线程池未关闭,直接在调用者线程中执行任务。任务提交线程性能可能会急剧下降。
- DiscardOldestPolicy:丢弃最老任务,就是即将被执行的下个任务,并且重新提交当前任务。
- DiscardPolicy:丢弃无法处理的任务,不予任何处理。
//自定义线程factory和拒绝策略
ExecutorService es = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(10),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return null;
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString()+" is discard");
}
});
【扩展线程池】
- ThreadPoolExecutor提供了beforeExecute(),afterExeute(),terminated()三个接口对线程池进行控制。
public class ExtThreadPool {
public static void main(String[] args) {
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MICROSECONDS,
new LinkedBlockingQueue<Runnable>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行:"+((Mytask)r).name);
}
@Override
protected void afterExecute(Runnable r,Throwable t) {
System.out.println("执行完成:"+((Mytask)r).name);
}
@Override
protected void terminated() {
System.out.println("线程池退出");
}
};
for (int i = 0; i < 5; i++) {
Mytask mytask = new Mytask("线程"+i);
es.execute(mytask);
}
es.shutdown();//关闭线程池,如果还有线程在运行,则等待,相当于只发送了一个关闭信号。
}
}
class Mytask implements Runnable{
public String name;
public Mytask(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("正在执行:Thread ID:"+Thread.currentThread().getId()+",taskName = "+name);
}
}
【注意】
- submit方法可能会吃掉线程的一些异常,比如除数为0,可以改用execute()方法。
- execute方法接受Runnable实例,用于提交不需要返回值的任务。
- submit方法用于提交需要返回值的任务。它会返回一个future对象,可以判断是否允许成功,也可以get()返回值。
2.3 fork/join框架
- 分而治之。
- 毫无顾忌fork()开启线程会严重运行性能。因此JDK提供了ForkJoinPool线程池。
- 可以向ForkJoinPool提交ForkJoinTask任务,ForkJoinTask有两个子类,RecursiveAction(没有返回值)和RecursiveTask(可以携带返回值)。
public class CountTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10000;
private long start;
private long end;
public CountTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
if(end - start < THRESHOLD) {
for(long i = start; i <= end; i++) {
sum += i;
}
}else {
long step = (start + end) / 100;
ArrayList<CountTask> subTasks = new ArrayList<>();
long pos = start;
for(int i = 0; i < 100; i++) {
long lastOne = pos + step;
if(lastOne > end)lastOne = end;
CountTask subTask = new CountTask(pos, lastOne);
pos += step + 1;
subTasks.add(subTask);
subTask.fork();
}
for(CountTask task : subTasks) {
sum += task.join();
}
}
return sum;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
CountTask task = new CountTask(0, 200000L);
ForkJoinTask<Long> result = forkJoinPool.submit(task);
System.out.println(result.get());
}
}
3. JDK并发容器
3.1 几种并发容器简介
- ConcurrentHashMap:高效的并发HashMap
- CopyOnWriteArrayList:读多写少时,性能远好于Vector
- ConcurrentLinkedQueue:高效的并发队列,使用链表实现
- BlockingQueue:一个接口,表示阻塞队列,适用于数据共享的通道
- ConcurrentSkipListMap:跳表实现的Map,快速查找。
另外可以用Collections工具包装成线程安全。
//内部其实就是和HashMap,通过一个对象锁进行同步,效率堪忧。
Map map = Collections.synchronizedMap(new HashMap<>());
3.2 高效读取:不变模式下的CopyOnWriteArrayList
- 读写锁在读-读时不会阻塞,但是读-写会阻塞。
- CopyOnWriteArrayList只有在写-写时才会阻塞,读时不加锁。
- 原理:在写入前进行一次自我复制,只修改副本,写完之后将副本进行替换。这样就不会影响读操作了。
3.3 数据共享通道:BlockingQueue
- BlockingQueue之所以适合作为数据共享的通道,关键在于Blocking,服务线程(从队列中获取消息)在队列为空时进行等待,当有新消息入队时,自动唤醒服务线程。
例如:ArrayBlockingQueue,take()方法和put()方法,在空和满时会等待
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();//take操作时,如果队列为空,则让当前线程等待在notEmpty上,新元素入队时,进行一次notEmpty的唤醒。
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) //队列满,入队线程等待
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal(); //插入元素,不空了,通知等待取元素的线程。
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); //元素被挪走时,出现空位,通知等待入队的线程
return x;
}
网友评论