美文网首页
Disruptor-04 消费之handleEventsWith

Disruptor-04 消费之handleEventsWith

作者: rock_fish | 来源:发表于2020-06-09 00:06 被阅读0次

    1. 使用handleEventsWith 指定消费者

    • handleEventsWith 可配置多个EventHandler,为其中的每个EventHandler创建一个线程.
    • 对于消费者来说,只要有元素可以消费,就进行消费
    • 多消费者模式下,不是等到1个元素被所有消费者都消费完了,才继续一起同时开始消费下一个;消费者互相之间是不等待的,只要当前元素消费完了,还有元素可以消费,当前消费者就继续消费下一个元素
    • 多个消费者的情况下,每个消费者都顺序消费RingBuffer的消息,只是消费速度不同

    1.1非批量发布事件

    • 当生产者把RingBuffer放满时,生产者进入等待状态,直到最慢的一个消费者消费掉一个元素(即等到至少有一个位置可写入).

    1.2批量发布事件publishEvents

    • 批量写入的数据个数,不能超过RingBuffer的大小;如RingBuffer的大小是4,则这一批的数据只能<=4 ,否则报错.

    • 批量发布事件对象,消费者 也 按照发布的批次数据来消费;这个批次的数据作为一个整体来作为是否有可写入空间;比如一次批写入是2个事件,那么就需要有2个可写入位置的时候才写入;同样如果一次批写入是4个事件,那么就需要有4个可写入位置的时候才写入.

    1.3 写入策略

    • 对于publishEvent当消费速度慢,导致没有位置可写入的时候,生产者要干巴巴的等待着
    • 有没有写入控制策略比如写入超时?放弃写入,尝试写入? 答案是:tryxxx,通过这个tryPublishEvent ,tryPublishEvents我们可以自己实现一个超时写入策略:失败丢弃,限时等待,多次尝试...

    1.4 批量消费数据

    消费者在执行消费的时候,要查看一下已经有哪些数据可消费了,即获取可消费的事件的最大下标,然后就通过循环遍历的方式,将当前消费位置,到最大消费位置之间的数据连续的消费掉;而不是消费一个,查看一下下一个位置是不是可以消费.

    源码

    事件类

    public class LongEvent {
        private int id;
        private long value;
    
        public LongEvent(int id) {
            this.id = id;
        }
    
        public int getId() {
            return id;
        }
    
    
        public long getValue() {
            return value;
        }
    
        public void set(long value)
        {
            this.value = value;
        }
    
        @Override
        public String toString() {
            return "LongEvent{" +
                    "id=" + id +
                    ", value=" + value +
                    '}';
        }
    }
    

    事件工厂类

    public class LongEventFactory implements EventFactory<LongEvent>
    {
    
        private static int counter =0;
        public LongEvent newInstance()
        {
            LongEvent longEvent =  new LongEvent(counter++);
            System.out.println("new Event:" + longEvent.getId());
            return  longEvent;
        }
    }
    

    快消费

    public class LongEventHandler implements EventHandler<LongEvent> {
        
        @Override
        public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
            //System.out.println("消费 第" + l + "个Event对象,其Id为:" + longEvent.getId() + "其内容为:" + longEvent.getValue());
            System.out.println(LocalDateTime.now()+ "  快消费线程" + Thread.currentThread().getId() + " 消费 第" + l+ " 个 Event对象,其id 为:"+longEvent.getId()+ " ,读取其值为:" + longEvent.getValue());
        }
    }
    

    慢消费

    public class SlowEventHandler implements EventHandler<LongEvent> {
        
        @Override
        public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
            TimeUnit.SECONDS.sleep(3);
            System.out.println(LocalDateTime.now()+ "  慢消费线程" + Thread.currentThread().getId() + " 消费 第" + l+ " 个 Event对象,其id 为:"+longEvent.getId()+ " ,读取其值为:" + longEvent.getValue());
        }
    }
    

    事件转换器

    /**
     * 复用对象,仅变更对象中的属性值
     */
    public class LongEventTranslator  implements EventTranslatorOneArg<LongEvent,Long> {
        @Override
        public void translateTo(LongEvent longEvent, long l, Long aLong) {
            System.out.println(LocalDateTime.now()+ "  生产线程" + Thread.currentThread().getId() + " 发布 第" + l+ " 个 Event对象,其id 为:"+longEvent.getId()+ " ,设置其值为:" + aLong);
            longEvent.set(aLong);
        }
    }
    

    生产者

    public class LongEventProducer {
        private RingBuffer<LongEvent> ringBuffer;
    
        public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
    
        public void publishEvent(Long aLong) {
    
            EventTranslatorOneArg<LongEvent, Long> translator = new LongEventTranslator();
            /**
             * 生产者阻塞等待,直到有ringbuffer中有空闲位置可写入
             */
            ringBuffer.publishEvent(translator, aLong);
        }
    
        public void tryPublishEvent(Long aLong) {
    
            EventTranslatorOneArg<LongEvent, Long> translator = new LongEventTranslator();
    
            /**
             * 生产者尝试往ringbuffer写入:
             * 1. 若可写入,则写入并返回成功
             * 2. 若无空闲位置可写入,则直接返回false,不阻塞等待
             */
            boolean publishSucess = ringBuffer.tryPublishEvent(translator, aLong);
            if(!publishSucess){
                System.out.println("尝试发布失败...");
            }
    
        }
    
        public void publishEvents(Long[] aLongs) {
    
            /**
             * 批量发布
             */
            EventTranslatorOneArg<LongEvent, Long> translator = new LongEventTranslator();
            ringBuffer.publishEvents(translator, aLongs);
    
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
    
            // Executor that will be used to construct new threads for consumers
            Executor executor = Executors.newCachedThreadPool();
    
            // 事件工厂,用于创建event
            LongEventFactory factory = new LongEventFactory();
    
            // 指定ringbuf的大小,必须是2的整数倍
            int bufferSize = 4;
    
            // 构建一个 Disruptor
            Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, executor);
    
            System.out.println("..new Disruptor...");
    
            // 给disruptor中添加消费者
            disruptor.handleEventsWith(new LongEventHandler());//快消费,
            disruptor.handleEventsWith(new SlowEventHandler());//慢消费,
            System.out.println("..handleEventsWith...");
    
            // 启动disruptor
            disruptor.start();
            System.out.println("..start...");
    
            //-----------万事俱备,只欠消息(消息的生产者投递消息)
    
            // Get the ring buffer from the Disruptor to be used for publishing.
            RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    
            LongEventProducer producer = new LongEventProducer(ringBuffer);
    
            // 跟blockqueue 比对一下
    
            /*for (long l = 0; l<8; l++)
            {
                long startAt = System.currentTimeMillis();
                producer.publishEvent(l);
                long endAt = System.currentTimeMillis();
                //System.out.println(endAt-startAt);
                //Thread.sleep(1000);
            }*/
    
    
            /* 按照Ringbuffer大小,批发布
            producer.publishEvents(new Long[]{0L,1L,2L,3L});
            producer.publishEvents(new Long[]{4L,5L,6L,7L});
            producer.publishEvents(new Long[]{8L,9L,10L,11L});
            producer.publishEvents(new Long[]{12L,13L,14L,15L});*/
            
            // 小于Ringbuffer大小批写入
            producer.publishEvents(new Long[]{0L,1L});
            producer.publishEvents(new Long[]{2L,3L});
            producer.publishEvents(new Long[]{4L,5L});
            producer.publishEvents(new Long[]{6L,7L});
    
        }
    

    相关文章

      网友评论

          本文标题:Disruptor-04 消费之handleEventsWith

          本文链接:https://www.haomeiwen.com/subject/fqqyzhtx.html