带着目的性去学习源码
并行与并发
- 并发的级别可以分类成:阻塞,无饥饿,无障碍,无锁,无等待。
- 原子性,指一个操作是不可中断的即使多个线程一起执行的时候一个操作一旦开始就不会被其他线程干扰。
- 可见性,当一个线程修改了某一个共享变量的值其他线程马上就能知道这个修改。由于缓存cache优化或硬件优化,指令重排等原因都可能导致一个线程修改不会立即被其他线程察觉。
- 有序性,这主要是涉及多个线程下之间相互调用时出现的指令执行先后顺序不一致的问题,但是在单个线程中是执行顺序是一定的。重排序问题的出现是因为CPU在处理汇编指令的时候对每条命令进行流水线处理,为了在ADD,SUB等命令等待停顿时做点其他的事开始先执行其他指令。
并行线程
- 线程的启动和关闭
不建议使用stop关闭,stop是强制关闭并释放所有的线程这样很大程度破坏了原子性导致最后数据不一致的情况。可以自定义局部变量volatile stop来控制。 - 线程中断
线程中断线程并不会立即退出而是给线程发一个通知有人希望你退出了,至于目标线程如何处理由其自己决定。
public void Thread.interrupt();
public boolean Thread.isInterrupted();
public static boolean Thread.interrupted(); // 判断是否中断并清除当前中断状态
使用interrupt同样可以结束线程但是效果更加的强劲。如果线程在sleep期间被中断则会运行期异常InterruptedException如果这时捕获到异常则需开始处理。
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread() {
@Override
public void run() {
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("thread break");
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("interrupted when sleep");
//再次中断自己,表示退出。置上标志位
Thread.currentThread().interrupt();
}
Thread.yield();
}
}
};
thread.start();
Thread.sleep(2000);
thread.interrupt();
}
-
等待(wait)和通知(notify)
无论是wait还是notify,在等待和执行之前都需先获取到对象锁 -
等待线程结束(join)和谦让(yield)
有时候一个线程的输入非常依赖另一个或多个线程的输出,这时这个线程就必须等待依赖线程执行完毕才能继续执行。这就是join功能
yield则表示退出当前CPU,重新参与竞争不是退出。
并发包
1 锁
1.1 重入锁ReetrantLock
对于同一个线程可以多次进入同一个锁而不会产生死锁的问题。但是同时lock几次也要释放几次不然可能造成死锁
- 中断响应
- 锁申请等待限时
- 公平锁
Condition
Semaphore
读写锁ReadWriteLock
CountDownLatch
1.2 线程池
public void createPool(){
BlockingDeque<Object> taskQueue = new LinkedBlockingDeque<>(50);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10,100,10L,TimeUnit.SECORD,taskQueue, new TaskRejectedExecutionHandler());
}
// 饱和策略
class TaskRejectedExecutionHandler implements RejectedExecutionHandler{
void rejectedExecution(Runnable r, ThreadPoolExecutor executor){ ... }
}
线程池参数
corePoolSize:线程池核心数量
workQueue:任务队列,被提交但是尚未执行。
`ArrayBlockingQueue`:基于数组结构的有界阻塞队列FIFO;
`LinkedBlockingQueue`: 基于链表结构的无界阻塞队列FIFO,若线程增加的速度和处理的速度相差过大则队列会一直保持增长直至内存消耗过尽,如静态工厂方法Executors.newFixedThreadPool() ;
`SynchronizedQueue`:不存储元素的阻塞队列每个插入操作都必须等待一个移除操作,它不会涉及任务的真实保存如果没有线程了则尝试创建线程如果已经达到最大容量则拒绝,所以它的maximumPoolSize必须设置大一点。Executors.newCachedThreadPool() ;
`PriorityBlockingQueue`:具有优先级别的无界阻塞队列
maximumPoolSize:最大线程数
keepAliveTime:超过corePoolSize的线程或者`allowCoreThreadTimeOut`为true的主线程使用这个作为超时时间。
threadFactory:设置线程池中线程的参数比如name辨识启动线程,daemo守护主线程一旦主线程退出则线程池停止。
handler:拒绝策略
核心线程 & 最大线程数:
线程池创建和关闭
线程池的创建不能使用Executors去创建而是通过ThreadPoolExecutor的方式。Executors的各个方面的弊端如下:
1.`newFixedThreadPool `,提供固定数量的线程池,使用的队列是无界阻塞LinkedBlockingQueue<>()默认Integer_MAX当提交频繁队列可能迅速膨胀从而消耗资源。
2.`newCacheThreadPool` ,new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); 它的corePoolSize大小为0,maximumPoolSize无限大当有线程提交直接进入SynchronousQueue队列,直接提交给线程池创建新的线程执行任务。当有大量任务被提交时线程池就会不断的创建新的线程处理那么线程池的资源就会被消耗。
3.`newSingleThreadExecutor` ,单一把线程池corePoolSize和miximumPoolSize大小设置为1。
4. newScheduledThreadPool` ,
线程池的关闭
线程池实现步骤
- 当前线程数小于
corePoolSize
创建新的线程去处理任务。 - 到达核心线程
corePoolSize
则加入阻塞队列BlockingQueue<Runable>
。 - 阻塞队列满了则创建新的线程处理任务。
- 运行线程大于
maximumPoolSize
则实行拒绝策略。
线程池的选择策略
corePoolSize = Ncpu * Ucup(100%) * (1+ w/c ) ;
w/c == 阻塞时间 / 计算时间
IO密集型线程池:如果存在IO则阻塞时间大于计算时间w/c>1,但是考虑到系统内存有限每次开启一个线程都需要内存空间,保守取1就是2Ncpu。
计算密集型的线程池:假设没有等待,则W/C = 0即 Nthreads = Ncpu.
线程池底层原理
**** 你的思维习惯正在限制你的成长
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
1.如果运行的线程少于corePoolSize,尝试开启一个新线程去运行command,command作为这个线程的第一个任务.
2.如果任务成功放入队列,我们仍需要一个双重校验去确认是否应该新建一个线程(因为可能存在有些线程在我们上次检查后死了) 或者 从我们进入这个方法后,pool被关闭了所以我们需要再次检查state,如果线程池停止了需要回滚入队列,如果池中没有线程了,新开启 一个线程
3.如果无法将任务入队列(可能队列满了),需要新开区一个线程(自己:往maxPoolSize发展)如果失败了,说明线程池shutdown 或者 饱和了,所以我们拒绝任务
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
1.3 JDK并发容器
-
ConcurrentHashMap
线程安全的HashMap -
CopyOnWriteArrayList
在读多写少安全的ArrayList -
ConcurrentLinkedQueue
线程安全的LinkedList,高效读写的队列
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
if (p.casNext(null, newNode)) {
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
}
else if (p == q)
p = (t != (t = tail)) ? t : head;
else
p = (p != t && t != (t = tail)) ? t : q;
}
}
- BlockingQueue 阻塞队列适用于数据共享的通道
- ConcurrentSkipListMap 跳表的实现,快速查找
锁的优化
使用锁的建议
- 减少锁的粒度
- 读写锁来代替独占锁
JVM对锁的优化
- 锁偏向
- 轻量锁
- 自旋锁
另一种"锁" -- ThreadLocal
无锁
无锁是一种乐观策略它会假设对资源的访问时没有冲突的,既然没有冲突就不需要等待所以所有的线程都可以在不停顿的状态下持续执行。如果遇到冲突就会Compare And Swap比较然后交换。CAS主要包含三个参数CAS(V,E,N),V表示要更新的变量,E表示预期值,N表示新值。
以AtomicInteger为例
public final int incrementAndGet(){
for( ; ; ){
int current = get()'
int next = current+1;
// 在将新值next写入的时候当前值要等于刚刚取得的current
if(compareAndSet(current,next)) return next;
}
}
死锁
- 案例:“哲学家”吃饭-思考问题
- 解决办法:1.使用无锁的函数;2.重入锁的设置中断或限时等待
并行模型与算法
生产者与消费者
网友评论