美文网首页
flink异步io

flink异步io

作者: 熊云昆 | 来源:发表于2022-12-23 22:26 被阅读0次

    最近在项目中用到了flink异步io模式去查询redis,相比于之前的同步访问模式,性能提升了好几倍,感叹异步io模式的强大,趁着这段时间有空好好看了一下异步io模式的实现源码。
    flink源码中异步io的实现逻辑都在AsyncWaitOperator这个类中:

    @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
            // add element first to the queue
            final ResultFuture<OUT> entry = addToWorkQueue(element);
    
            final ResultHandler resultHandler = new ResultHandler(element, entry);
    
            // register a timeout for the entry if timeout is configured
            if (timeout > 0L) {
                resultHandler.registerTimeout(getProcessingTimeService(), timeout);
            }
    
            userFunction.asyncInvoke(element.getValue(), resultHandler);
        }
    

    主要流程是:
    1.先把输入数据放入到队列中
    2.将输入数据封装为ResultHandler对象,这个对象在异步回调时会传入进去
    3.调用asyncInvoke方法
    异步io分为两种模式:

    有序模式

    实现方式是采用ArrayDeque来实现,当resultHandler有回调返回的时候,会同时更新ArrayDeque中的元素状态,然后会判断ArrayDeque队列第一个元素是否有结果,如果有则弹出执行

    @Override
        public boolean hasCompletedElements() {
            return !queue.isEmpty() && queue.peek().isDone();
        }
    
        @Override
        public void emitCompletedElement(TimestampedCollector<OUT> output) {
            if (hasCompletedElements()) {
                final StreamElementQueueEntry<OUT> head = queue.poll();
                head.emitResult(output);
            }
        }
    

    无序模式

    无序模式实现的方式也比较简单,通过定义的Segment来实现:

    Segment(int initialCapacity) {
       incompleteElements = new HashSet<>(initialCapacity);
       completedElements = new ArrayDeque<>(initialCapacity);
    }
    /** Signals that an entry finished computation. */
    void completed(StreamElementQueueEntry<OUT> elementQueueEntry) {
       // adding only to completed queue if not completed before
       // there may be a real result coming after a timeout result, which is updated in the
       // queue entry but
       // the entry is not re-added to the complete queue
       if (incompleteElements.remove(elementQueueEntry)) {
           completedElements.add(elementQueueEntry);
        }
    }
    int emitCompleted(TimestampedCollector<OUT> output) {
         final StreamElementQueueEntry<OUT> completedEntry = completedElements.poll();
         if (completedEntry == null) {
             return 0;
         }
         completedEntry.emitResult(output);
         return 1;
    }
    

    可以看到,通过定义了一个HashSet来存放未完成数据回调的incompleteElements对象,因为HashSet也是无序的,而在完成回调后,调用completed方法,将数据从HashSet移除,放入到completedElements再调用emitCompleted方法将数据发给下游

    一点总结

    1.flink异步io接口必须封装在外部存储io异步访问接口中使用才有效,如果外部存储io接口只支持同步,那其实实现效果跟同步访问一样,性能并没有提升
    2.使用flink异步io后,在运行过程中默认会断开算子链,不过目前看性能并没有损失多少,可以接受

    相关文章

      网友评论

          本文标题:flink异步io

          本文链接:https://www.haomeiwen.com/subject/hjriqdtx.html