Java Util Concurrent 这个包值得反复看
这里只是记录一些相对比较粗略的设计tips,
越来越觉得优秀的软件设计都在细节里面,所以有心人需要不断的研究细节;
AQS abstractQueueSynchronizer
- 通过cas + 队列 的方式来实现 同步器的效果; cas 主要处理 state 的状态,0 为 unlock状态,非0 为locked 状态;
队列用来维护等待的线程,通过连接关系来处理 先驱和后继线程, 在公平锁实现的时候,很有用; - 对于state 变量, 如果只支持0.1,则是独占锁,如果支持 state > 0, 则为共享锁;ReentrantLock;ReentrantReadWriteLock
是典型的shared锁,每进入一次就可以state增加一次; - 读写锁并没有使用两个state,而是一个state的高低bit 位,来记录读写的状态; bit 高低位在juc中用的很多;
- aqs 是其他同步器的基石; countdownlatch,也是 shared 锁, state =0,的时候 await(tryaccquire)才能够返回;
- cycleBarrier,用的是条件变量condition,由lock创建,可以精确的唤醒一组线程,比object的wait和notify机制要表达能力强;
cycleBarrier比如为10, 那么没调用一个await 就会减一,如果为0, 就 sinallAll, cycleBarrier,达到0之后,会有一个reset操作,所以
cycleBarrier是可以重复使用的; - semaphore,个人经常用来控制流量,机制和countLatch相反, 初始化permit为10个,accquire,如果 state >0, 就不用返回,<=0,则需要阻塞等待;
- 其他用的不是很多的同步器。Exchanger,适合两个线程之间交互信息,特别适合两个线程组成pair的形式,相互准备好数据之后,进行transfer;
另外一个是phaser,可以理解为countdownlatch的增强版本,可以动态增加删除等待的线程数,而且phaser有一个阶段的概念,可以reset重复使用;
线程池
- executor执行框架的核心,queue, 线程, futureTask; 大概有3种;我们最常用的Poolexecutor,schedulePoolExecutor,forkjoin
- ThreadPoolExecutor,的核心,coresize,maxsize,queue,keepalive, abortPolicy;coreszie会一直创建,之后放入pool中,然后maxsize; 如果还有未处理的任务,就使用拒绝策略,
默认 rejectException; 等到任务load没那么大的时候就需要回收线程; java回收的实现也是非常的优雅,他是从 queue 进行pool,如果 keepAlive 时间之后,还没有返回,就会超时,那么这个线程就会回收; - juc中,引入了callable和future 的支持; 本质是一个FutureTask,futuetask 的表达能力要比之前的runnable强很多,有返回值,可以抛出异常,并且可以 get阻塞,还可以取消;
FutureTask,可以被多个线程等待,由一个NodeList维护,runable执行完成之后,会通知等待的线程; - 通过变化线程池的参数,可以组合适用不同场景的线程池,Executors工具类提供了很多静态工厂方法;FixPool用的比较多,还有newCachedThreadPool,coresize=0,maxSize无限大,没有缓存队列,任何任务都创建线程,
适合短平快的工作任务;另外一个重要的参数就是threadFactory,可以定义我们想要创建的线程,比如命名规则,优先级等; - 参数的可变性,由于这些参数的重要性,对参数的修改可以大幅度修改线程池的行为,但是在工厂方法里,有些singleThreadPool,他的时限是一个ProxyWrapper,就是对于参数修改的接口进行了隐藏;
以防止不必要的修改,类似于Immutable的包装形式; - executors里面,还有另外一类,schedulePool,他对应的是scheduleExecuteService;他是common 形式的一个派生,任务是重复执行的主要依赖的是DelayQueue,scheduleTask有时间的信息,task在DelayQueue里面
按照倒计时的时间排序,是一个逆序的堆结构,take的时候,会阻塞线程;直到delay时间到了。 - executors里面,还有一个是forkjoinPool, 这个我们平时没注意,但是有两个地方的内部实现就是用的forkJOinpool。 一个是stream。的parallel模式, 一个就是CompletableFuture的异步模式,都是默认用
forkjoinpool,采用默CPU个线程。 forkJoinPool的实现很复杂,大概的思想就是一个thread 一个pool,然后空闲的thread stealing task;比较适合计算密集型的任务,最后是能够实现 分治模式,子task和父亲task有依赖关系的,
这样可以在一个queue里面,不用切换; - completableFuture,函数是编程范式,异步任务的回调非常方便,依赖completeStage把不同的operation组装成链式结构,运行时由stack处理上下任务的依赖和触发关系;
集合类
- concurrentHashMap,从segment到 slot级别,链表到红黑书,resize 的幂次性;
- navigatableMap/Set, 平时用的少,其实很好用,特别是floor/ceil方法,
- skipListMap/Set,没有使用红黑书,而是使用跳跃表来实现key的有序性;
- copyOnwriteArrayList, 读和写都不互斥,有非常高的读性能,写的时候,从新copy一个数组,然后switch,有短暂的不一致性;写的性能不高;
BlockingQueue
- BlockingQueue,FIFO;DeQueue;两端都可以出入;
- ArrayBlockingQUeue,用的就是环形数组, readIndex和putIndex,空间和效率的平衡;使用一个锁+两个条件遍历,控制空+full状态下的queue的操作;
- linklistBlockingQueue,使用Node节点结构;更加复杂的内存分配和gc压力;使用take和put两把锁,并发吞吐量更高;
- 优先级队列和delayQueue都是堆结构的队列,可以实现自己comparator,控制排序;
- SynchronousQueue,和transfer机制类似, insert 必须要有一个 take配对,目前看用于 cacheThreadPool里面;
其他工具类
- 依赖unsafe工具,读取内存偏移量,然后CAS操作内存的值;同时支持一些基本类型的数组模式,对于数组模式唯一不同的就是计算index 对应的内存偏移量;
- 同时提供了一个高并发下的累加器,longAdd, 超高并发下,比atomicLong的效率高,采用了分段的模式,把long 变成 多个cell段,效果就是对同一个资源的cas,变成对多段的cas;
cell,使用了@sun.misc.Contended ,处理伪共享的概念,每个contented对象64个字节,刚好一个cache Line,而不会连累其他的变量频繁被invalid;** tHashMap 的count方法,基本使用了一模一样的思路。 使用baseCount+counterCell[],分段计数器的方式来计数,以便提高并发度 ** - StampedLock,更加高效的锁模式,读写乐观锁,使用bit 分段技术, 低位记录读线程数,另一位记录写线程数,高位seg记录版本号; 每次更新都需要递增,读的时候直接乐观读,在判断是否时间戳Version
被修改,如果修改退回到readWriteLock; - ThreadLocalRandom, 虽然random也是线程安全,但是每次都会刷新seed,需要加锁互斥;threadlocalRandom为每个线程维护自己的seed,效率更高;
网友评论