Java自带的线程安全队列
介绍Disruptor之前,我们先来看一看常用的线程安全的内置队列有什么问题。Java的内置队列如下表所示。
队列 | 有界性 | 锁 | 数据结构 |
---|---|---|---|
ArrayBlockingQueue | bounded | 加锁 | arraylist |
LinkedBlockingQueue | optionally-bounded | 加锁 | linkedlist |
ConcurrentLinkedQueue | unbounded | 无锁 | linkedlist |
LinkedTransferQueue | unbounded | 无锁 | linkedlist |
PriorityBlockingQueue | unbounded | 加锁 | heap |
DelayQueue | unbounded | 加锁 | heap |
狭义上真正的锁,或者说所谓的重量级锁,是需要操作系统配合的,某一个线程占用一个CPU在执行,运行到Lock之后,底层调用操作系统test and set指令对锁对应的一个内存中的整型数进行原子性设置,设置成功代表获取锁,线程继续执行,否则代表未竞争到锁,线程挂起,等待锁释放后自己被唤醒。另外,由于多核的原因,在操作系统对上述整型数进行设置的时候还要能够锁总线,确保同时只有一个CPU核能够通过总线来写内存,这是通过一个硬件HLOCK Pin高低电位来实现的,低电位代表总线此刻被某个CPU核心独占。
不加锁的队列,使用cas操作原子变量status,各线程cas(status, 0, 1),返回true说明获取到“锁”、可以操作队列、比如入队和出队,返回0则说明获取“锁”失败。然后我们发现java提供的两个无锁队列都是无界的。稳定性要求高的系统,使用无界队列有一定风险,如果生产方速度过快,会导致队列里积压大量任务,由此可能会导致内存溢出,或者消费端疲于处理。
所以我们要找个有界队列,然后我们可能为了减少gc对系统性能的影响,会尽可能选择array/heap格式的数据结构,这样算下来也只有ArrayBlockingQueue了。
但是,ArrayBlockingQueue整体1把锁,恰恰是性能最差的那个。
锁带来的开销是什么?
- 线程竞争不到锁被挂起,锁释放时线程又被唤醒,这个过程中带来上下文切换,带来比较大的开销
- 线程在等待锁的过程中,会被中断,不能做任何事情。
- 持有锁的线程也可能会遇到缺页错误、调度延迟或者其他类似的系统级的情况,这个线程无法执行下去,变相的相当于所有需要这个锁都无法执行。
一般的,不加锁的性能 > CAS的性能 > 加锁的性能
在高度竞争的情况下,锁的性能可能会超过不断循环CAS的性能,所以会有所谓的锁升级: 无锁,偏向所,轻量锁,重量锁。
伪共享问题
CPU操作L1\L2\L3高速缓存的时候的最小单位是缓存行cache line,64字节,现在假设一个场景:一个cache line已经在两个CPU核心的L1缓存里了,这个cache line里边有两个变量,X和Y,分别由两个线程去内存里修改这两个变量的本体。X被线程1修改之后,根据MESI协议,这个缓存行变为失效,但是显然同时波及了变量Y,CPU2上执行的线程2也会发现自己的L1中的Y缓存也失效了,只能去内存中读取。这就是伪共享问题。伪共享问题的危害是导致CPU高速缓存失效,而转为读取更慢的内存。
关于RingBuffer
核心数据结构RingBuffer是个环形数组,容量自定义、超过之后会从头覆写,但下标用的是long,几乎是无尽的,所以不用担心用完。
byte的取值范围为-128~127,占用1个字节(-2的7次方到2的7次方-1)
short的取值范围为-32768~32767,占用2个字节(-2的15次方到2的15次方-1)
int的取值范围为(-2147483648~2147483647),占用4个字节(-2的31次方到2的31次方-1)
long的取值范围为(-9223372036854774808~9223372036854774807),占用8个字节(-2的63次方到2的63次方-1)
2^63-1个slot,假如每秒使用100万个slot、即100万QPS的处理速度,也可以用9223372036854秒,约为292,471年...
所以,用就是了!
多生产者情况需要解决的问题
-
写数据,计算n个生产者本次一共要写入多少个slot,然后从ring buffer上申请这个slot区间。在这个slot区间上分成n个段,每个段对应1个生产者、每个段的长度就是这个生成者自己要写入的slot数。这样就解决了“多个生产者线程重复的写了同一个slot”的问题。
-
读数据,消费者线程申请读取到n下标,首先write cursor一定是要大于n的,因为要保证读取的是已经可以允许写入的slot,其次,即便如此仍还不能确定消费线程本次可以直接读取到n,因为此时生产者正在并行的写入,write cursor只是“当前可以写入到这里为止”的意思,要防止消费者线程读到还未真正写入的slot,disruptor采用的是另一个与RingBuffer一样大小的AvailableBuffer,生产者每次写入相应的RingBuffer的slot之后,就去AvailableBuffer上对应的位置置写入标识为已写入。
例如:消费者读线程申请读取到下标从3到11的slot,判断writer cursor>=11。然后开始读取AvailableBuffer,从3开始,往后读取,发现下标为7的slot没有生产成功,于是WaitFor(11)返回6,表明RingBuffer现在只能读取到slot下标6。这样本次消费者最终读取从3到6共计4个slot里边的元素。
Disruptor在Log4j2中的应用
笔者曾经主持优化的一个springboot应用,核心功能是要接收客户端上报的行为日志,将其输出到本地日志文件(当然可以直接输出到kafka这样的分布式队列式文件系统,但当时确实限于团队条件先直接输出到本地日志里了),使用的是logback,在压测与优化过程中,将log appender配置为async appender之后,性能有了极大的提升,工作线程将日志输出任务丢进async appender的队列之后马上返回,压测tps提升。而logback的async appender所使用的生产者消费者模式中的队列用的是ArrayBlockingQueue,Log4j2在吸收了log4j与logback的优点基础之上,进一步优化了性能,以异步日志为例,其使用了Disruptor代替了ABQ,根据美团公司对Log4j2的实践,其异步日志的性能较logback提升了1个数量级,较同步日志提升了2个数量级。
https://tech.meituan.com/2016/11/18/disruptor.html 《高性能队列——Disruptor》美团技术团队
网友评论