美文网首页
Disruptor quickStart!

Disruptor quickStart!

作者: 火兰人一个 | 来源:发表于2019-12-19 16:21 被阅读0次

      说到队列,大家都很熟悉,像生活中不管是吃饭还是买东西基本上都会遇到排队,先排队的人先付款,不允许插队,否则可能会出现下面的情况:

    image

    先进先出,这就是典型的“队列”。

    简单回顾jdk里的队列

    这里简单讲一下以下俩种队列

    1、阻塞队列:

    ArrayBlockingQueue: Object[] + count + lock.condition(notEmpty、notFull)

    入队:

    不阻塞:add、offer 满了直接报错
    
    阻塞:put 满了:notFull.await();(当出队和删除元素时唤醒put操作)
    

    出队:

    take():当空时,notEmpty.await();当有元素入队时唤醒.
    
    poll():当空时直接返回null
    

    LinkedBlockingQueue:Node实现、加锁(读锁、写锁分离)、可选的有界队列。需要考虑实际使用中的内存问题,防止溢出。

    应用:Eexcutors默认是使用LinkedBlockingQueue,但是在实际应用中,更应该手动创建线程池使用有界队列,防止生产者生产过快,导致内存溢出。

    2、延迟队列:

    DelayQueue : PriorityQueue + Lock.condition + leader

    PriorityQueue优先级队列

    condition 延迟等待

    leader 避免不必要的kong等待

    方法:

    getDelay()延迟时间
    
    compareTo()通过该方法比较从PriorityQueue里取值
    

    入队:

    add、put、offer:入队时会将换唤醒等待中的线程,进行一次出队处理

    出队:

    take()阻塞:

    1、如果队列里无数据,元素入队时会被唤醒
    
    2、有数据,会阻塞至时间满足
    

    poll():满足队列有数据并且delay时间不大于0会取出元素,否则立即返回null—可能会抢占成为leader

    还有优先级队列等就不一一细说,有兴趣的同学可以去看一下。

    应用:延时任务:设置任务延迟多久执行;需要设置过期值的处理,例如缓存过期,实现方式:每次getDealy()方法提供一个缓存创建时间与当前时间的差值,出队时compareTo()方法取差值最小的。每次入队时都会重新取出队列里差值最小的值进行处理。

      我们使用队列的,更多的是像生产者、消费者这种场景。这种场景大多数情况又对处理速度有着要求,所以我们会使用多线程技术。使用多线程就可能会出现并发,为了避免出错,我们会选择线程安全的队列。例如ArrayBlockingQueue、LinkedBlockingQueue或者是ConcurrentLinkedQueue,前俩者是通过加锁取实现,后面一种是通过cas去实现线程安全。但是又要考虑到生产者过快可能造出的内存溢出的问题,所以看起来ArrayBlockingQueue是最符合要求的。但是恰恰加锁效率又是最慢的,所以就引出了我们今天需要讨论的主题:Disruptor!

    比较:

    ArrayBlockingQueue VS Disruptor

    看代码。。。

    介绍

      Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单。业务逻辑处理器完全是运行在内存中(圈起来要考),使用事件源驱动方式。业务逻辑处理器的核心是Disruptor。

    接下来我们来看一下disruptor是如何做到无阻塞、多生产、多消费的。

    image

    EventFactory:创建消息(任务)的工厂类

    ringBufferSize:容器的长度

    Executor:消费者线程池,执行任务的线程

    ProductType:生产者类型:单生产者、多生产者

    WaitStrategy:等待策略

    下面简单看一下disruptor的代码。

    看代码。。。

    可以看出在调用了start()方法后,消费者线程就已经开启,其中涉及到一个重要的概念:EventProcessor

    BatchEventProcessor主要事件循环,处理disruptor中的event,拥有消费者的Sequence

    image

    另一个核心概念:RingBuffer:它是一个首尾相接的环状的容器,用来在多线程中传递数据。可以看到我们进行生产者时,先从ringbuffer里拿,再进行投递。

    image

    这里使用next()获得的序号为数组中下一个可用的元素,再get(seq)获取到该位置的元素,再进行赋值处理。

    这里的序号是如何产生的呢?

    Sequence:顺序递增的序号来编号,管理交换的数据。生产者和消费者都会有维护自己的Sequence,通过进行比较,来平衡生产者和消费者的关系。消除伪共享(填充缓存行)。

    Sequencer:在生产者和消费者之间快速、正确的传递数据的并发算法

    Sequence Barrier:序号栅栏,用来平衡生产者和消费者之间的关系

    image

    上面说到ringBuffer有定义长度,说明是一个有界的队列,那么可能会出现以下俩种情况:当消费者消费速度大于生产者生产者速度,生产者还未来得及往队列写入,或者生产者生产速度大于消费者消费速度,此时怎么办呢?

    常用的WaitStrategy等待策略(消费者等待)

    BlockingWaitStrategy使用了锁,低效的策略。

    SleepingWaitStrategy对生产者线程的影响最小,适合用于异步日志类似的场景。(不加锁空等)

    YieldingWaitStrategy性能最好,适合用于低延迟的系统,在要求极高性能且之间处理线数小于cpu逻辑核心数的场景中,推荐使用。(无锁策略。主要是使用了Thread.yield()多线程交替执行)

    至此,disruptor的基本核心概念已经介绍完毕!

    Disruptor多边形操作:

    image

    如何实现第一张图里的多边形操作?

    
    disruptor.handleEventsWith(E1, E2);
    
    disruptor.after(E1).handleEventsWith(E3);
    
    disruptor.after(E2).handleEventsWith(E4);
    
    disruptor.after(E3, E4).handleEventsWith(E5);
    
    

    有兴趣的同学可以试一下!

    再了解了disruptor的核心概念和看了代码之后,就可以继续学习disruptor的多生产多消费模型了,disruptor的多线程才能发挥真正的力量!

    多生产多消费模型

    image

    简单看一下代码。。。

    image

    简单分析,多个生产者同时向ringbuffer投递数据,假设此时俩个生产者将ringbuffer已经填满,因为sequence的序号是自增+1(若不满足获取条件则循环挂起当前线程),所以生产的时候能保证线程安全,只需要一个sequence即可。当多消费者来消费的时候,因为消费速度不同,例如消费者1来消费0、1,消费者2消费2、4,消费者3消费3。当消费者消费完0后,消费者2消费完2后,消费者3消费完3后,生产者再往队列投递数据时,其他位置还未被消费,会投递到第0个位置, 此时再想投递数据时,虽然消费2的第二个位置空缺、消费者3的第三个位置空缺,消费者还在消费1时,无法继续投递。因为是通过比较消费者自身维护的sequence的最小的序号,来进行比较。

    应用:

    Apache Storm、Camel、Log4j 2

    Log4j2 example:

    使用了实现EventTranslator的提交机制。

    image

    可参考美团文章:https://tech.meituan.com/2016/11/18/disruptor.html中指出:美团在公司内部统一推行日志接入规范,要求必须使用Log4j 2,使普通单机QPS的上限不再只停留在几千,极高地提升了服务性能。

    over。~!

    相关文章

      网友评论

          本文标题:Disruptor quickStart!

          本文链接:https://www.haomeiwen.com/subject/fazmnctx.html