美文网首页爬虫专题
Disruptor 实践:整合到现有的爬虫框架

Disruptor 实践:整合到现有的爬虫框架

作者: fengzhizi715 | 来源:发表于2018-12-04 11:19 被阅读617次
    秋天的颜色.jpg

    一. Disruptor

    Disruptor 是一个高性能的异步处理框架。

    Disruptor 是 LMAX 在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升。其实 Disruptor 与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在“并发、缓冲区、生产者—消费者模型、事务处理”这些元素的程序来说,Disruptor提出了一种大幅提升性能(TPS)的方案。

    二. 实践

    NetDiscovery 是基于 Vert.x、RxJava 2 等框架实现的爬虫框架。

    NetDiscovery 默认的消息队列采用 JDK 的 ConcurrentLinkedQueue,由于爬虫框架各个组件都可以被替换,所以下面基于 Disruptor 实现爬虫的 Queue。

    2.1 事件的封装

    将爬虫的 request 封装成一个 RequestEvent,该事件会在 Disruptor 中传输。

    import com.cv4j.netdiscovery.core.domain.Request;
    import lombok.Data;
    
    /**
     * Created by tony on 2018/9/1.
     */
    @Data
    public class RequestEvent {
    
        private Request request;
    
        public String toString() {
    
            return request.toString();
        }
    }
    

    2.2 发布事件

    下面编写事件的发布,从 RingBuffer 中获取下一个可写入事件的序号,将爬虫要请求的 request 设置到 RequestEvent 事件中,最后将事件提交到 RingBuffer。

    import com.cv4j.netdiscovery.core.domain.Request;
    import com.lmax.disruptor.RingBuffer;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * Created by tony on 2018/9/2.
     */
    public class Producer {
    
        private final RingBuffer<RequestEvent> ringBuffer;
    
        private AtomicInteger count = new AtomicInteger(0); // 计数器
    
        public Producer(RingBuffer<RequestEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
        public void pushData(Request request){
            long sequence = ringBuffer.next();
    
            try{
                RequestEvent event = ringBuffer.get(sequence);
                event.setRequest(request);
            }finally {
                ringBuffer.publish(sequence);
                count.incrementAndGet();
            }
        }
    
        /**
         * 发送到队列中到Request的数量
         * @return
         */
        public int getCount() {
    
            return count.get();
        }
    }
    

    2.3 消费事件

    RequestEvent 设置了 request 之后,消费者需要处理具体的事件。下面的 Consumer 仅仅是记录消费者的线程名称以及 request。真正的“消费”还是需要从 DisruptorQueue 的 poll() 中获取 request ,然后在 Spider 中进行“消费”。

    import com.lmax.disruptor.WorkHandler;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * Created by tony on 2018/9/2.
     */
    @Slf4j
    public class Consumer implements WorkHandler<RequestEvent> {
    
        @Override
        public void onEvent(RequestEvent requestEvent) throws Exception {
    
            log.info("consumer:" + Thread.currentThread().getName() + " requestEvent: value=" + requestEvent.toString());
        }
    }
    

    2.4 DisruptorQueue 的实现

    Disruptor 支持单生产者单消费者、多生产者、多消费者、分组等方式。

    NetDiscovery 中采用多生产者多消费者。

    在 RingBuffer 创建时,ProducerType 使用 MULTI 类型表示多生产者。创建 RingBuffer 采用了 YieldingWaitStrategy 。YieldingWaitStrategy 是一种WaitStrategy,不同的 WaitStrategy 会有不同的性能。

    YieldingWaitStrategy 性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。

            ringBuffer = RingBuffer.create(ProducerType.MULTI,
                    new EventFactory<RequestEvent>() {
                        @Override
                        public RequestEvent newInstance() {
                            return new RequestEvent();
                        }
                    },
                    ringBufferSize ,
                    new YieldingWaitStrategy());
    

    EventProcessor 用于处理 Disruptor 中的事件。

    EventProcessor 的实现类包括:BatchEventProcessor 用于单线程批量处理事件,WorkProcessor 用于多线程处理事件。

    WorkerPool 管理着一组 WorkProcessor。创建完 ringBuffer 之后,创建 workerPool:

            SequenceBarrier barriers = ringBuffer.newBarrier();
    
            for (int i = 0; i < consumers.length; i++) {
                consumers[i] = new Consumer();
            }
    
            workerPool = new WorkerPool<RequestEvent>(ringBuffer,
                            barriers,
                            new EventExceptionHandler(),
                            consumers);
    

    启动 workerPool:

            ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
            workerPool.start(Executors.newFixedThreadPool(threadNum));
    

    最后是 DisruptorQueue 完整的代码:

    import com.cv4j.netdiscovery.core.domain.Request;
    import com.cv4j.netdiscovery.core.queue.AbstractQueue;
    import com.lmax.disruptor.*;
    import com.lmax.disruptor.dsl.ProducerType;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * Created by tony on 2018/9/1.
     */
    @Slf4j
    public class DisruptorQueue extends AbstractQueue {
    
        private RingBuffer<RequestEvent> ringBuffer;
    
        private Consumer[] consumers = null;
        private Producer producer = null;
        private WorkerPool<RequestEvent> workerPool = null;
        private int ringBufferSize = 1024*1024; // RingBuffer 大小,必须是 2 的 N 次方
    
        private AtomicInteger consumerCount = new AtomicInteger(0);
    
        private static final int CONSUME_NUM = 2;
        private static final int THREAD_NUM = 4;
    
        public DisruptorQueue() {
    
            this(CONSUME_NUM,THREAD_NUM);
        }
    
        public DisruptorQueue(int consumerNum,int threadNum) {
    
            consumers = new Consumer[consumerNum];
    
            //创建ringBuffer
            ringBuffer = RingBuffer.create(ProducerType.MULTI,
                    new EventFactory<RequestEvent>() {
                        @Override
                        public RequestEvent newInstance() {
                            return new RequestEvent();
                        }
                    },
                    ringBufferSize ,
                    new YieldingWaitStrategy());
    
            SequenceBarrier barriers = ringBuffer.newBarrier();
    
            for (int i = 0; i < consumers.length; i++) {
                consumers[i] = new Consumer();
            }
    
            workerPool = new WorkerPool<RequestEvent>(ringBuffer,
                            barriers,
                            new EventExceptionHandler(),
                            consumers);
    
            ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
            workerPool.start(Executors.newFixedThreadPool(threadNum));
    
            producer = new Producer(ringBuffer);
        }
    
        @Override
        protected void pushWhenNoDuplicate(Request request) {
    
            producer.pushData(request);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public Request poll(String spiderName) {
    
            Request request = ringBuffer.get(ringBuffer.getCursor() - producer.getCount() +1).getRequest();
            ringBuffer.next();
            consumerCount.incrementAndGet();
            return request;
        }
    
        @Override
        public int getLeftRequests(String spiderName) {
    
            return producer.getCount()-consumerCount.get();
        }
    
        public int getTotalRequests(String spiderName) {
    
            return super.getTotalRequests(spiderName);
        }
    
        static class EventExceptionHandler implements ExceptionHandler {
    
            public void handleEventException(Throwable ex, long sequence, Object event) {
    
                log.debug("handleEventException:" + ex);
            }
    
            public void handleOnStartException(Throwable ex) {
    
                log.debug("handleOnStartException:" + ex);
            }
    
            public void handleOnShutdownException(Throwable ex) {
    
                log.debug("handleOnShutdownException:" + ex);
            }
        }
    }
    

    其中,pushWhenNoDuplicate() 是将 request 发送到 ringBuffer 中。poll() 是从 ringBuffer 中取出对应的 request ,用于爬虫进行网络请求、解析请求等处理。

    总结:

    爬虫框架 github 地址:https://github.com/fengzhizi715/NetDiscovery

    上述代码是比较经典的 Disruptor 多生产者多消费者的代码,亦可作为样板代码使用。

    最后,在爬虫框架是面向接口编程的,所以替换其中的任意组件都比较方便。

    该系列的相关文章:
    从API到DSL —— 使用 Kotlin 特性为爬虫框架进一步封装
    使用Kotlin Coroutines简单改造原有的爬虫框架
    为爬虫框架构建Selenium模块、DSL模块(Kotlin实现)
    基于Vert.x和RxJava 2构建通用的爬虫框架

    相关文章

      网友评论

      本文标题:Disruptor 实践:整合到现有的爬虫框架

      本文链接:https://www.haomeiwen.com/subject/nyhnqqtx.html