美文网首页
Async I/O 的实现原理

Async I/O 的实现原理

作者: 专职掏大粪 | 来源:发表于2020-05-21 17:52 被阅读0次

在使用 Flink 处理实时数据流时,经常需要和外部系统进行交互。例如,在构建实时数据仓库的时候,通常需要将消息和外部维表进行关联,以获得额外的维度数据。由于外部系统的响应时间和网络延迟可能会很高,如果采用同步调用的方式,那么外部调用的高延迟势必会影响到系统的吞吐量,进而成为系统的瓶颈。这种情况下,我们需要采用异步调用的方式。异步调用相比于同步调用,不同请求的等待时间可以重叠,从而提升了吞吐率。

Async I/O 的使用方式

在 Flink 中使用 Async I/O 的需要有一个支持异步请求的客户端。以官方文档给出的说明为例:

/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

   /** The database specific client that can issue concurrent requests with callbacks */
   lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

   /** The context used for the future callbacks */
   implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())


   override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {

       // issue the asynchronous request, receive a future for the result
       // 发起异步请求,返回结果是一个 Future
       val resultFutureRequested: Future[String] = client.query(str)

       // set the callback to be executed once the request by the client is complete
       // the callback simply forwards the result to the result future
       // 请求完成时的回调,将结果交给 ResultFuture
       resultFutureRequested.onSuccess {
           case result: String => resultFuture.complete(Iterable((str, result)))
       }
   }
}

// create the original stream
val stream: DataStream[String] = ...

// 应用 async I/O 转换,设置等待模式、超时时间、以及进行中的异步请求的最大数量
val resultStream: DataStream[(String, String)] =
   AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

AsyncDataStream 提供了两种调用方法,分别是 orderedWait 和 unorderedWait,这分别对应了有序和无序两种输出模式。之所以会提供两种输出模式,是因为异步请求的完成时间是不确定的,先发出的请求的完成时间可能会晚于后发出的请求。在“有序”的输出模式下,所有计算结果的提交完全和消息的到达顺序一致;而在“无序”的输出模式下,计算结果的提交则是和请求的完成顺序相关的,先处理完成的请求的计算结果会先提交。值得注意的是,在使用“事件时间”的情况下,“无序”输出模式仍然可以保证 watermark 的正常处理,即在两个 watermark 之间的消息的异步请求结果可能是异步提交的,但在 watermark 之后的消息不能先于该 watermark 之前的消息提交。

由于异步请求的完成时间不确定,需要设置请求的超时时间,并配置同时进行中的异步请求的最大数量。

Async I/O 的实现

AsyncDataStream 在运行时被转换为 AsyncWaitOperator 算子,它是 AbstractUdfStreamOperator 的子类。下面我们来看看 AsyncWaitOperator 的实现原理。

基本原理

AsyncWaitOperator 算子相比于其它算子的最大不同在于,它的输入和输出并不是同步的。因此,在 AsyncWaitOperator 内部采用了一种 “生产者-消费者” 模型,基于一个队列解耦异步计算和计算结果的提交。StreamElementQueue 提供了一种队列的抽象,一个“消费者”线程 Emitter 从中取出已完成的计算结果,并提交给下游算子,而异步请求则充当了队列“生产者”的角色。基本的处理逻辑如下图所示

image.png
  • AsyncWaitOperator
//用于提交队列中完成的队列元素
private transient Emitter<OUT> emitter;

public void processElement(StreamRecord<IN> element) throws Exception {
        final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);
        if (timeout > 0L) {
            long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
             //注册一个定时器,在超时时调用 timeout 方法
            final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
                timeoutTimestamp,
                new ProcessingTimeCallback() {
                    @Override
                    public void onProcessingTime(long timestamp) throws Exception {
                        userFunction.timeout(element.getValue(), streamRecordBufferEntry);
                    }
                });

            // Cancel the timer once we've completed the stream record buffer entry. This will remove
            // the register trigger task
            streamRecordBufferEntry.onComplete(
                (StreamElementQueueEntry<Collection<OUT>> value) -> {
                    timerFuture.cancel(true);
                },
                executor);
        }
        //加入队列
        addAsyncBufferEntry(streamRecordBufferEntry);
//发送异步请求,执行用户方法进行异步调用
        userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
    }

private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        pendingStreamElementQueueEntry = streamElementQueueEntry;
    //尝试将待完成的请求加入队列,如果队列已满(到达异步请求的上限),会阻塞
        while (!queue.tryPut(streamElementQueueEntry)) {
            checkpointingLock.wait();
        }
        pendingStreamElementQueueEntry = null;
    }
public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collection<OUT>>
    implements AsyncCollectionResult<OUT>, ResultFuture<OUT> {

    /** Timestamp information. */
    private final boolean hasTimestamp;
    private final long timestamp;

    //包含执行结果的Future
    private final CompletableFuture<Collection<OUT>> resultFuture;

    public StreamRecordQueueEntry(StreamRecord<?> streamRecord) {
        super(streamRecord);
        hasTimestamp = streamRecord.hasTimestamp();
        timestamp = streamRecord.getTimestamp();
//StreamRecordQueueEntry 内部包装了CompletableFuture(新建了一个异步调用的Future)
        resultFuture = new CompletableFuture<>();
    }
}

Emitter线程线程消费队列数据,发送到指定的输出.(典型的生产者消费者模型)

@Override
    public void run() {
        try {
            while (running) {
//以阻塞的方式从队列peek已完成的数据
                AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();
                output(streamElementEntry);
            }
        } 
    }

    private void output(AsyncResult asyncResult) throws InterruptedException {
  //判断完成的结果是水印
        if (asyncResult.isWatermark()) {
            synchronized (checkpointLock) {
                AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();
                LOG.debug("Output async watermark.");
                                 //发送水印
                output.emitWatermark(asyncWatermarkResult.getWatermark());
                                //从队列移除元素
                streamElementQueue.poll();
                // 通知主线程有队列有空闲空间,可以放元素了
                checkpointLock.notifyAll();
            }
        } else {
                  //异步完成的结果为数据类型
            AsyncCollectionResult<OUT> streamRecordResult = asyncResult.asResultCollection();
            if (streamRecordResult.hasTimestamp()) {
                timestampedCollector.setAbsoluteTimestamp(streamRecordResult.getTimestamp());
            } else {
                timestampedCollector.eraseTimestamp();
            }
            synchronized (checkpointLock) {
                LOG.debug("Output async stream element collection result.");
                try {
                    Collection<OUT> resultCollection = streamRecordResult.get();
                    if (resultCollection != null) {
                        for (OUT result : resultCollection) {
                                                         //发送数据
                            timestampedCollector.collect(result);
                        }
                    }
                } 
        streamElementQueue.poll();
        checkpointLock.notifyAll();
            }
        }
    }

AsyncWaitOperator 可以工作在两种模式下,即 ORDEREDUNORDERED。Flink 通过 StreamElementQueue 的不同实现实现了这两种模式。
“有序”模式下,所有异步请求的结果必须按照消息的到达顺序提交到下游算子。在这种模式下,StreamElementQueue 的具体是实现是 OrderedStreamElementQueue。OrderedStreamElementQueue 的底层是一个有界的队列,异步请求的计算结果按顺序加入到队列中,只有队列头部的异步请求完成后才可以从队列中获取计算结果

image.png
public class OrderedStreamElementQueue implements StreamElementQueue {
    //队列容量
    private final int capacity;

    /** Executor to run the onCompletion callback. */
    private final Executor executor;
    /** Lock and conditions for the blocking queue. */
    private final ReentrantLock lock;   初始话为非公平,提高效率
         //队列有空闲空间的condition
    private final Condition notFull;
        //队列同步完成的 condition
    private final Condition headIsCompleted;

    /** Queue for the inserted StreamElementQueueEntries. */
    private final ArrayDeque<StreamElementQueueEntry<?>> queue;

    @Override
    public AsyncResult peekBlockingly() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            //只有队列头部的请求完成或队列不为空后才解除阻塞状态
            while (queue.isEmpty() || !queue.peek().isDone()) {
                headIsCompleted.await();
            }
            return queue.peek();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public AsyncResult poll() throws InterruptedException {
        lock.lockInterruptibly();

        try {
            while (queue.isEmpty() || !queue.peek().isDone()) { 
                headIsCompleted.await();
            }
        //通知其他线程可以向队列放元素,队列目前not full
            notFull.signalAll();
            return queue.poll();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (queue.size() >= capacity) {
                                //队列容量达到上限,阻塞.等待被唤醒
                notFull.await();
            }
            addEntry(streamElementQueueEntry);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        lock.lockInterruptibly();

        try {
            if (queue.size() < capacity) { //未达容量上限
                addEntry(streamElementQueueEntry);
                return true;
            } else {
                return false;
            }
        } finally {
            lock.unlock();
        }
    }
}

    private void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            if (!queue.isEmpty() && queue.peek().isDone()) {
                  //callback 触发,队列头部请求已经完成.唤醒其他线程
                headIsCompleted.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }

“无序”模式
“无序”模式下,异步计算结果的提交不是由消息到达的顺序确定的,而是取决于异步请求完成顺序。当然,在使用“事件时间”的情况下,要保证 watermark 语义的正确性。在使用“处理时间”的情况下,由于不存在 Watermark,因此可以看作一种特殊的情况。在 UnorderedStreamElementQueue 中巧妙地实现了这两种情况

image.png

UnorderedStreamElementQueue 内部使用了两个队列,ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue 中保存未完成的异步请求计算结果,而 completedQueue 中保存已完成的异步请求计算结果。注意,ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue 这个队列中的元素是异步请求计算结果的散列集合,从图中也可以看出, watermarkSet 作为一种特殊的集合,其内部只有一个元素,即 Watermark,充当了不同散列集合之间的分界。这样就保证了在一个 Watermark 之后的异步请求的计算结果不会先于Watermark 之前进行提交firstSet 中完成异步请求的计算结果会被转移到 completedQueue 队列中,firstSet 内部的所有异步请求的计算结果都是可以乱序提交的。

  • 如果不使用“事件时间”,那么没有 Watermark 产生,所有的异步请求都会进入 firstSet 中,因而所有的结果都是乱序提交的

  • firstSet:存储Watermark 之前的数据,完成的请求会通过onCompleteHandler方法从firstSet移除放入completedQueue,
    如果firstSet被消费完成则从uncompletedQueue拉取下一批待完成请求的请求Set集合

  • lastSet:目前看只用到了没开启水印的情况, onCompleteHandlerfirstSet != lastSet的判断条件

    private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
         //  开启水印
        if (streamElementQueueEntry.isWatermark()) {
          //为lastSet引用指向新set对象
            lastSet = new HashSet<>(capacity);
         //firstSet为空,将水印元素放入firstSet
            if (firstSet.isEmpty()) {
                firstSet.add(streamElementQueueEntry);
            } else {
         //firstSet不为空, 构造一个只包含这个 watermark 的 set 加入到uncompletedQueue 队列中
                Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1);
                watermarkSet.add(streamElementQueueEntry);
                uncompletedQueue.offer(watermarkSet);
            }
            uncompletedQueue.offer(lastSet);
        } else {
        // 没开启水印,元素直接放入firstSet(lastset==firstSet)
            lastSet.add(streamElementQueueEntry);
        }
        //设置异步请求完成后的回调
        streamElementQueueEntry.onComplete(
            (StreamElementQueueEntry<T> value) -> {
                try {
                    onCompleteHandler(value);
                } catch (InterruptedException e) {
                    ... 略
            },
            executor);

        numberEntries++;
    }


    public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
        lock.lockInterruptibly();

        try {

            if (firstSet.remove(streamElementQueueEntry)) {
                completedQueue.offer(streamElementQueueEntry);
                             //对于未开启水印,不会走到while循环,因为firstSet == lastSet
                while (firstSet.isEmpty() && firstSet != lastSet) {
                                        1.firstSet为空,并且当前为水印元素所以代表当前水印之前的元素都处理完成
2.从uncompletedQueue拉取新的待完成的请求set赋值为firstSet
                    firstSet = uncompletedQueue.poll();
        
                    Iterator<StreamElementQueueEntry<?>> it = firstSet.iterator();
3.循环遍历firstSet
                    while (it.hasNext()) {
                        StreamElementQueueEntry<?> bufferEntry = it.next();
                //看是否有已完成的请求,并放入completedQueue
同时从firstSet中清除
                        if (bufferEntry.isDone()) {
                            completedQueue.offer(bufferEntry);
                            it.remove();
                        }
                    }
                }

                LOG.debug("Signal unordered stream element queue has completed entries.");
                hasCompletedEntries.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }

最后放上一个异步io的例子

private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
        private static final long serialVersionUID = 2098635244857937717L;

        private transient ExecutorService executorService;

        /**
         * The result of multiplying sleepFactor with a random float is used to pause
         * the working thread in the thread pool, simulating a time consuming async operation.
         */
        private final long sleepFactor;

        /**
         * The ratio to generate an exception to simulate an async error. For example, the error
         * may be a TimeoutException while visiting HBase.
         */
        private final float failRatio;

        private final long shutdownWaitTS;

        SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) {
            this.sleepFactor = sleepFactor;
            this.failRatio = failRatio;
            this.shutdownWaitTS = shutdownWaitTS;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            executorService = Executors.newFixedThreadPool(30);
        }

        @Override
        public void close() throws Exception {
            super.close();
            ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService);
        }

        @Override
        public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) {
            executorService.submit(() -> {
                // wait for while to simulate async operation here
                long sleep = (long) (ThreadLocalRandom.current().nextFloat() * sleepFactor);
                try {
                    Thread.sleep(sleep);

                    if (ThreadLocalRandom.current().nextFloat() < failRatio) {
                        resultFuture.completeExceptionally(new Exception("wahahahaha..."));
                    } else {
                        resultFuture.complete(
                            Collections.singletonList("key-" + (input % 10)));
                    }
                } catch (InterruptedException e) {
                    resultFuture.complete(new ArrayList<>(0));
                }
            });
        }
    }

容错

在异步调用模式下,可能会同时有很多个请求正在处理中。因而在进行快照的时候,需要将异步调用尚未完成,以及结果尚未提交给下游的消息加入到状态中。在恢复的时候,从状态总取出这些消息,再重新处理一遍。为了保证 exactly-once 特性,对于异步调用已经完成,且结果已经由 emitter 提交给下游的消息就无需保存在快照中。

public class AsyncWaitOperator<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT>, OperatorActions {
    /** Recovered input stream elements. */
    private transient ListState<StreamElement> recoveredStreamElements;

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        recoveredStreamElements = context
            .getOperatorStateStore()
            .getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));

    }

    @Override
    public void open() throws Exception {
        super.open();
        
        //......

        // 状态恢复的时候,从状态中取出所有为完成的消息,重新处理一遍
        if (recoveredStreamElements != null) {
            for (StreamElement element : recoveredStreamElements.get()) {
                if (element.isRecord()) {
                    processElement(element.<IN>asRecord());
                }
                else if (element.isWatermark()) {
                    processWatermark(element.asWatermark());
                }
                else if (element.isLatencyMarker()) {
                    processLatencyMarker(element.asLatencyMarker());
                }
                else {
                    throw new IllegalStateException("Unknown record type " + element.getClass() +
                        " encountered while opening the operator.");
                }
            }
            recoveredStreamElements = null;
        }
    }


    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);

        //先清除状态
        ListState<StreamElement> partitionableState =
            getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
        partitionableState.clear();

        //将所有未完成处理请求对应的消息加入状态中
        Collection<StreamElementQueueEntry<?>> values = queue.values();
        try {
            for (StreamElementQueueEntry<?> value : values) {
                partitionableState.add(value.getStreamElement());
            }

            // add the pending stream element queue entry if the stream element queue is currently full
            if (pendingStreamElementQueueEntry != null) {
                partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
            }
        } catch (Exception e) {
            partitionableState.clear();
            throw new Exception("Could not add stream element queue entries to operator state " +
                "backend of operator " + getOperatorName() + '.', e);
        }
    }

}

转自:
https://blog.jrwang.me/2019/flink-source-code-async-io/

相关文章

  • Async I/O 的实现原理

    在使用 Flink 处理实时数据流时,经常需要和外部系统进行交互。例如,在构建实时数据仓库的时候,通常需要将消息和...

  • Java I/O体系从原理到应用,这一篇全说清楚了

    本文介绍操作系统I/O工作原理,Java I/O设计,基本使用,开源项目中实现高性能I/O常见方法和实现,彻底搞懂...

  • 聊聊flink的Async I/O

    序 本文主要研究一下flink的Async I/O 实例 本实例展示了flink Async I/O的基本用法,首...

  • 设备管理(一)

    目录 5.1 I/O硬件原理 I/O系统 I/O控制方式 设备控制器 5.2 I/O软件原理 I/O软件设计目标和...

  • 阅读源码的目的

    深入了解架构设计与实现原理 I/O 同步是指用户线程发起 I/O 请求后需要等待或者轮询内核 I/O 完成后再继续...

  • flink Async I/O

    Hbase端代码示例 这里使用线程池模拟异步IO,本人Hbase实验版本为1.30,Hbase 2.0才开始支持异...

  • Hadoop3 YARN集群中的磁盘I / O调度设计详解1

    问题导读 1.磁盘IO实现共享,使用的是什么原理技术? 2.如何描述磁盘I / O资源? 3.磁盘I / O资源中...

  • 图文详解 epoll 原理【Redis,Netty,Nginx实

    【Redis,Netty,Nginx 等实现高性能IO的核心原理】 I/O 输入输出(input/output)的...

  • Java NIO入门指南

    前言 本博客只简单介绍NIO的原理实现和基本工作流程 I/O和NIO的本质区别 NIO将填充和提取缓冲区的I/O操...

  • es6解读 - async函数

    async函数返回promise对象 async 函数的实现原理 所有的async函数都可以写成上面的第二种形式,...

网友评论

      本文标题:Async I/O 的实现原理

      本文链接:https://www.haomeiwen.com/subject/yxlzohtx.html