美文网首页
通信机制 Storm vs. Flink

通信机制 Storm vs. Flink

作者: 余楚倩 | 来源:发表于2018-11-27 10:04 被阅读0次

Storm通信机制

Storm和Flink都是流计算系统中比较成熟的开源实现,在一些流计算平台选型的文章中,两者也常常被拿来重点比较,但是两个平台都在持续更新,一些选型比较和压测结果文章都可能已经过时。而一个系统的核心实现往往比较少改动,因此本文尝试从底层实现的层面来比较两者的异同。

在上一篇文章中,我们自顶向下地了解了Flink的内部通信机制,本文按照同样的结构来说明Storm的通信机制。代码基于tag v1.2.2。

当我们编写一个Storm topology时,也是在描绘一个有向图。图中的边相当于由一系列数据记录组成的数据流,图中的顶点相当于我们对数据流的处理。

先简单介绍Storm的术语使用,并和Flink作对比,方便熟悉其中任意一个系统的同学更快地理解这些术语。术语介绍部分主要参考来源为Storm官方文档Concepts
Understanding the Parallelism of a Storm Topology

  • Spout: Spout是有向运行图中没有上游的节点,是数据流的起点。相当于Flink中的SourceOperator。
  • Bolt: 对数据流进行处理的节点,相当于Flink中的TransformationOperator
  • Component:Bolt和Spout的统称 ,相当于Flink中的Operator
  • Executor: 代表了Component的并行度。每个executor都会有一个工作线程,负责处理用户定义的业务逻辑,一个发送线程,负责把数据发送到下游队列。一个Executor中可以运行同一个component的多个实例。(相当于Flink中的SubTask)
  • Task: task代表一个component实例, 同一个Execotor中的task会被串行执行(要区别于并行度)
  • Worker: 代表一个进程,一个Work中可以运行多个Executor,相当于Flink中的TaskManager

术语对比:

Storm Flink
Worker TaskManager
Executor Task
Spout SourceOperator
Bolt TransformationOperator
Component Operator
Stream Stream
Task SubTask
? Chain

Storm的Tasks和Flink的Chain并不能等价,在storm中多个Task可以运行在一个Executor上,但这些Task指的是属于同一个component的不同实例,两个Task之间是是等价关系而不是上下游关系。Flink中一个Task中可以有多个Subtask , 在Flink中的Chain是指有上下游关系的且满足一定条件的多个Subtask可以成链的方式被当成一个Task处理。

Component间的通信实现

以Storm的WordCount为例,此例用Storm的底层API编写(相对Trident来说的底层),源码可以从Storm官方的Storm-starter模块获取。

storm编程模型

Storm的编程模型由Spout,Bolt和Stream组成。在图中split和count为Bolt, 灰色箭头为stream。图中总共有三种不同的component, 其中spout的实现类为RandomSentenceSpout,负责随机地从一个字符串数组中选择句子。split的功能是对句子进行分词。count功能是计算单词的出现次数。功能上与Flink的WordCount例子大同小异。

为了图例简洁和简化模型,在图中这三种executor的并行度分别为2、2、1,和代码中并行度不一致。这里没有开启acker和metric功能, 因此本文没有画出__acker和__metric两种系统实现的bolt,这acker部分会在分布式事务的对比中分析。

Storm没有实现Flink那样的Chain功能,上下游component不会位于同一个线程中,因此Storm的上下游component通信只有两种方式,本地线程通信或远程线程通信。

Storm内部每个executor都会有一个接收队列和一个发送队列,一个工作线程和一个发送线程。每个worker内部都会有一个发送队列,一个接收线程和一个发送线程。Storm中的队列按职能分了三类:分别为executor接收队列,executor发送队列和worker发送队列。这三类队列的消费者以sleep()的方式不断轮询来接收消息,接收消息后的处理结果publish到下游队列。

storm内部消息传递

有很多Storm的技术文章中画出了worker的接收线程和topology.receiver.buffer.size, 事实上在Storm1.0.x中 worker的接收线程已被移除,改为push的方式,在Storm server接收到消息后直接反序列化然后写到各个executor的receive-queue中。

Buffer的读写

Storm实现的生产者消费者模式使用到的缓存队列为LAMX Disruptor中的RingBuffer。LAMX Disruptor号称最快的无锁并发框架。在Storm的使用场景中,flush到RingBuffer时使用的等待策略为TimeoutBlockingWaitStrategy是通过ReentrantLock加锁阻塞的, 且flush到RingBuffer前也会通过锁来避免并发调用publishDirect(ArrayList<Object> objs, boolean block)方法。

Buffer写入

以Bolt->Bolt数据传输为例,Bolt中的tuple发送主要通过OutputCollector实现, 当一个bolt在execute()方法中调用了OutputCollector.emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) 后,它的调用栈如下图所示:
粉色部分为clojure实现,黄色部分是java实现。调用栈的最后调用了java实现的DisruptorQueue.publish(Object obj) 方法。DisruptorQueue是Storm对LAMX disruptor的封装,主要增加了批量发布和超时发布功能。由上一章节可知,tuple会被发布到一个名为executor$id-send-queue的DisruptorQueue中。

Buffer写入

DisruptorQueue的发布的代码逻辑比较复杂,主要通过ThreadLocalInserter和Flusher分别实现Tuple的批量发布和超时发布。
批量发布部分主要实现如下:DisruptorQueue的公共方法publish(Object obj)中,先后调用 ThreadLocalInserter的add和flush方法

/**
变量解释:
batcher即为ThreadLocalInserter对象实例
**/
public void publish(Object obj) {
    ... 
    batcher.add(obj);
    batcher.flush(false);
}
  • batcher.add(obj)方法的功能是把tuple放进当前批次的缓存中,如果当前批次_currentBatch满了,且当前没有发送失改批次(为了保证顺序性,未发送成功的批次需要先发送),会触发flush到disruptor的ringbuffer。但不保证flush成功,如果因为ringbuffer空间不足flush失败,会把失败的批次放进无界队列_overflow中缓存。注意:add方法不阻塞。

  • batcher.flush(boolean block)的功能是触发发送失败的批次flush到ringbuffer中。该方法还可能在定时调度的Flusher线程中被调用。注意:block==false时,flush方法不阻塞。

综上,RingBuffer中的发布单元为一个批次大小的tuple(而不是单个tuple),publish方法不会阻塞,_overflow是个无界非阻塞队列。因此,如果下游处理不及时且上游持续生产数据时,可能因为_overflow中缓存的对象过多而发生OOM。Storm提供了两种方式来避免这种情况,留在后续Storm和flink实现对比再讨论。

/**
变量解释:
_currentBatch为ArrayList<Object>对象实例, 用于缓存当前批次的tuple
_overflow为ConcurrentLinkedList<ArrayList<Object>>,无界队列,用于缓存发送失败的tuple batch
_inputBatchSize为当前批次的最大缓存tuple数

**/
    public synchronized void add(Object obj) {
    ...
        //如果当前批次已满
        if (_currentBatch.size() >= _inputBatchSize) {
            boolean flushed = false;
            //如果当前批次已满且缓存中没有发送失败的批次
            if (_overflow.isEmpty()) {
                try {
                  //发布到disruptor的ringbuffer中,非阻塞,当ringbuffer空间不足时抛出InsufficientCapacityException
                    publishDirect(_currentBatch, false);
                    _overflowCount.addAndGet(0 - _currentBatch.size());
                    _currentBatch.clear();
                    flushed = true;
                } catch (InsufficientCapacityException e) {
                    //Ignored we will flush later
                }
            }
            //如果当前批次已满 且 (缓存中有发送失败的批次 或 当前批次发送失败)
            if (!flushed) {
          //把当前批次加入到未发送失败的缓存队列中
                _overflow.add(_currentBatch);
                _currentBatch = new ArrayList<Object>(_inputBatchSize);
            }
        }
    }

    //May be called by a background thread
    public void flush(boolean block) {
        if (block) {
            _flushLock.lock();
        } else if (!_flushLock.tryLock()) {
            //Someone else if flushing so don't do anything
            return;
        }
        try {
            while (!_overflow.isEmpty()) {
                publishDirect(_overflow.peek(), block);
                _overflowCount.addAndGet(0 - _overflow.poll().size());
            }
        } catch (InsufficientCapacityException e) {
        //Ignored we should not block
        } finally {
            _flushLock.unlock();
        }
    }

到这里,看过Flink通信机制的同学应该明白“Flink的反压机制实现得更天然”的说法了。

DisruptorQueue是底层实现,直接暴露给用户的发送数据到下游的接口是output collector。Storm output collector的实现相较Flink混乱,存在两个问题:

  1. Collector命名比较混乱,例如有的实现类叫XXCollectorImpl,有的又不带Impl后缀 , ISpoutOutputCollector和IOutputCollector是两个完全不同的接口,两者不在同一继承树中,分别实现Spout的数据发送接口和Bolt的数据发送接口。不通过关键字搜索比较难找出全部实现了“tuple发送”功能的代码。
  2. Storm的collector实现耦合了tuple的发送逻辑和tuple的ack fail逻辑,因为ack/fail逻辑不同而划分了两种主要的OutputCollector , 分别是负责发送Spout tuple的ISpoutOutputCollector、负责发送IRichBolt tuple的IOutputCollector、其它Collector基本上是通过委托模式基于这两个Collector实现的。例如带有自动自动ack/fail tuple功能的IBasicOutputCollector,这个类把tuple发送逻辑委托给OutputCollector,而java实现的OutputCollector最后会委托给由clojure代码executor.clj中实现的IOutputCollector匿名类。

Storm有两个批量处理框架,相关框架的实现类分别以Transactional和Trident开头,Transactional开头的批处理实现已经被标记为废弃,现主要维护Trident的实现。这两个API中提供给用户编程使用的ITridentSpout和ITransactionalSpout 最后都会在Bolt所在的executor中调用,所以批处理编程API中的Spout使用的Collector实际父类或委托类为IOutputCollector。

output collector实现

Buffer读取

RingBuffer的读取和处理逻辑通过com.lmax.disruptor.EventHandler接口实现,executor中的工作线程和发送线程以及worker中的发送线程都分别实现了该接口。以executor工作线程为例,executor工作线程读取event后转换为Tuple, 并调用IBolt.execute(Tuple tuple)接口触发用户实现的业务逻辑。

buffer读取

上图的逻辑在一个轮询间隔为0的无限循环中: 当队列空闲时,cpu空转。

(defnk consume-loop*
  [^DisruptorQueue queue handler
   :kill-fn (fn [error] (exit-process! 1 "Async loop died!"))]
  (async-loop    ;;定义一个循环
          (fn [] (consume-batch-when-available queue handler) 0) ;;此处返回0, 代表sleep-time
          :kill-fn kill-fn  ;;接收到kill信号时执行的清理逻辑
          :thread-name (.getName queue)))  ;;线程名称

;; afn returns amount of time to sleep
(defnk async-loop [afn
                   :daemon false
                   :kill-fn (fn [error] (exit-process! 1 "Async loop died!"))
                   :priority Thread/NORM_PRIORITY
                   :factory? false
                   :start true
                   :thread-name nil]
           ... 
                         (let [sleep-time (afn)]
                           (when-not (nil? sleep-time)
                             (sleep-secs sleep-time)
                             (recur))     ;;循环调用
                           )
           ...

Storm和Flink对比

- Storm Flink
队列 Disruptor ArrayDeque+synchronized
队列有无锁 有锁,使用ReentrantLock 有锁,使用synchronized
队列有无等待 等待,默认Condiction.await(timeout) 等待,使用wait/notify
缓存 有缓存,用ArrayList和ConcurrentLinkedList 有缓存,用自定义的MemorySegment和ArrayDeque
缓存大小 可配置,默认100条,和message大小无关 可配置,默认最小32768 byte,和条数无关,是消息序列化后的大小(消息可以跨多个buffer)
生产方式 多生产者 多生产者
消费方式 单消费者 单消费者
序列化 默认kryo 自定义
序列化的时机 远程通信时 写入缓存时(因此本地线程通信也会序列化)
队列数 每个工作线程一个接收队列(both spout and bolt),每 每个工作线程一个消费队列(source除外)

相关文章

网友评论

      本文标题:通信机制 Storm vs. Flink

      本文链接:https://www.haomeiwen.com/subject/iwwwqqtx.html