美文网首页
聊聊flink的Async I/O

聊聊flink的Async I/O

作者: go4it | 来源:发表于2019-01-19 11:40 被阅读33次

    本文主要研究一下flink的Async I/O

    实例

    // This example implements the asynchronous request and callback with Futures that have the
    // interface of Java 8's futures (which is the same one followed by Flink's Future)
    
    /**
     * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
     */
    class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
    
        /** The database specific client that can issue concurrent requests with callbacks */
        private transient DatabaseClient client;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            client = new DatabaseClient(host, post, credentials);
        }
    
        @Override
        public void close() throws Exception {
            client.close();
        }
    
        @Override
        public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
    
            // issue the asynchronous request, receive a future for result
            final Future<String> result = client.query(key);
    
            // set the callback to be executed once the request by the client is complete
            // the callback simply forwards the result to the result future
            CompletableFuture.supplyAsync(new Supplier<String>() {
    
                @Override
                public String get() {
                    try {
                        return result.get();
                    } catch (InterruptedException | ExecutionException e) {
                        // Normally handled explicitly.
                        return null;
                    }
                }
            }).thenAccept( (String dbResult) -> {
                resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
            });
        }
    }
    
    // create the original stream
    DataStream<String> stream = ...;
    
    // apply the async I/O transformation
    DataStream<Tuple2<String, String>> resultStream =
        AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
    
    • 本实例展示了flink Async I/O的基本用法,首先是实现AsyncFunction接口,用于编写异步请求逻辑及将结果或异常设置到resultFuture,然后就是使用AsyncDataStream的unorderedWait或orderedWait方法将AsyncFunction作用到DataStream作为transformation;AsyncDataStream的unorderedWait或orderedWait有两个关于async operation的参数,一个是timeout参数用于设置async的超时时间,一个是capacity参数用于指定同一时刻最大允许多少个(并发)async request在执行

    AsyncFunction

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/AsyncFunction.java

    /**
     * A function to trigger Async I/O operation.
     *
     * <p>For each #asyncInvoke, an async io operation can be triggered, and once it has been done,
     * the result can be collected by calling {@link ResultFuture#complete}. For each async
     * operation, its context is stored in the operator immediately after invoking
     * #asyncInvoke, avoiding blocking for each stream input as long as the internal buffer is not full.
     *
     * <p>{@link ResultFuture} can be passed into callbacks or futures to collect the result data.
     * An error can also be propagate to the async IO operator by
     * {@link ResultFuture#completeExceptionally(Throwable)}.
     *
     * <p>Callback example usage:
     *
     * <pre>{@code
     * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
     *
     *   public void asyncInvoke(String row, ResultFuture<String> result) throws Exception {
     *     HBaseCallback cb = new HBaseCallback(result);
     *     Get get = new Get(Bytes.toBytes(row));
     *     hbase.asyncGet(get, cb);
     *   }
     * }
     * }</pre>
     *
     * <p>Future example usage:
     *
     * <pre>{@code
     * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
     *
     *   public void asyncInvoke(String row, final ResultFuture<String> result) throws Exception {
     *     Get get = new Get(Bytes.toBytes(row));
     *     ListenableFuture<Result> future = hbase.asyncGet(get);
     *     Futures.addCallback(future, new FutureCallback<Result>() {
     *       public void onSuccess(Result result) {
     *         List<String> ret = process(result);
     *         result.complete(ret);
     *       }
     *       public void onFailure(Throwable thrown) {
     *         result.completeExceptionally(thrown);
     *       }
     *     });
     *   }
     * }
     * }</pre>
     *
     * @param <IN> The type of the input elements.
     * @param <OUT> The type of the returned elements.
     */
    @PublicEvolving
    public interface AsyncFunction<IN, OUT> extends Function, Serializable {
    
        /**
         * Trigger async operation for each stream input.
         *
         * @param input element coming from an upstream task
         * @param resultFuture to be completed with the result data
         * @exception Exception in case of a user code error. An exception will make the task fail and
         * trigger fail-over process.
         */
        void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
    
        /**
         * {@link AsyncFunction#asyncInvoke} timeout occurred.
         * By default, the result future is exceptionally completed with a timeout exception.
         *
         * @param input element coming from an upstream task
         * @param resultFuture to be completed with the result data
         */
        default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
            resultFuture.completeExceptionally(
                new TimeoutException("Async function call has timed out."));
        }
    
    }
    
    • AsyncFunction接口继承了Function,它定义了asyncInvoke方法以及一个default的timeout方法;asyncInvoke方法执行异步逻辑,然后通过ResultFuture.complete将结果设置到ResultFuture,如果异常则通过ResultFuture.completeExceptionally(Throwable)来传递到ResultFuture

    RichAsyncFunction

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java

    @PublicEvolving
    public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {
    
        private static final long serialVersionUID = 3858030061138121840L;
    
        @Override
        public void setRuntimeContext(RuntimeContext runtimeContext) {
            Preconditions.checkNotNull(runtimeContext);
    
            if (runtimeContext instanceof IterationRuntimeContext) {
                super.setRuntimeContext(
                    new RichAsyncFunctionIterationRuntimeContext(
                        (IterationRuntimeContext) runtimeContext));
            } else {
                super.setRuntimeContext(new RichAsyncFunctionRuntimeContext(runtimeContext));
            }
        }
    
        @Override
        public abstract void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
    
        //......
    }
    
    • RichAsyncFunction继承了AbstractRichFunction,同时声明实现AsyncFunction接口,它不没有实现asyncInvoke,交由子类实现;它覆盖了setRuntimeContext方法,这里使用RichAsyncFunctionRuntimeContext或者RichAsyncFunctionIterationRuntimeContext进行包装

    RichAsyncFunctionRuntimeContext

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java

        /**
         * A wrapper class for async function's {@link RuntimeContext}. The async function runtime
         * context only supports basic operations which are thread safe. Consequently, state access,
         * accumulators, broadcast variables and the distributed cache are disabled.
         */
        private static class RichAsyncFunctionRuntimeContext implements RuntimeContext {
            private final RuntimeContext runtimeContext;
    
            RichAsyncFunctionRuntimeContext(RuntimeContext context) {
                runtimeContext = Preconditions.checkNotNull(context);
            }
    
            @Override
            public String getTaskName() {
                return runtimeContext.getTaskName();
            }
    
            @Override
            public MetricGroup getMetricGroup() {
                return runtimeContext.getMetricGroup();
            }
    
            @Override
            public int getNumberOfParallelSubtasks() {
                return runtimeContext.getNumberOfParallelSubtasks();
            }
    
            @Override
            public int getMaxNumberOfParallelSubtasks() {
                return runtimeContext.getMaxNumberOfParallelSubtasks();
            }
    
            @Override
            public int getIndexOfThisSubtask() {
                return runtimeContext.getIndexOfThisSubtask();
            }
    
            @Override
            public int getAttemptNumber() {
                return runtimeContext.getAttemptNumber();
            }
    
            @Override
            public String getTaskNameWithSubtasks() {
                return runtimeContext.getTaskNameWithSubtasks();
            }
    
            @Override
            public ExecutionConfig getExecutionConfig() {
                return runtimeContext.getExecutionConfig();
            }
    
            @Override
            public ClassLoader getUserCodeClassLoader() {
                return runtimeContext.getUserCodeClassLoader();
            }
    
            // -----------------------------------------------------------------------------------
            // Unsupported operations
            // -----------------------------------------------------------------------------------
    
            @Override
            public DistributedCache getDistributedCache() {
                throw new UnsupportedOperationException("Distributed cache is not supported in rich async functions.");
            }
    
            @Override
            public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
                throw new UnsupportedOperationException("State is not supported in rich async functions.");
            }
    
            @Override
            public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
                throw new UnsupportedOperationException("State is not supported in rich async functions.");
            }
    
            @Override
            public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
                throw new UnsupportedOperationException("State is not supported in rich async functions.");
            }
    
            @Override
            public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
                throw new UnsupportedOperationException("State is not supported in rich async functions.");
            }
    
            @Override
            public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
                throw new UnsupportedOperationException("State is not supported in rich async functions.");
            }
    
            @Override
            public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
                throw new UnsupportedOperationException("State is not supported in rich async functions.");
            }
    
            @Override
            public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {
                throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
            }
    
            @Override
            public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
                throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
            }
    
            @Override
            public Map<String, Accumulator<?, ?>> getAllAccumulators() {
                throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
            }
    
            @Override
            public IntCounter getIntCounter(String name) {
                throw new UnsupportedOperationException("Int counters are not supported in rich async functions.");
            }
    
            @Override
            public LongCounter getLongCounter(String name) {
                throw new UnsupportedOperationException("Long counters are not supported in rich async functions.");
            }
    
            @Override
            public DoubleCounter getDoubleCounter(String name) {
                throw new UnsupportedOperationException("Long counters are not supported in rich async functions.");
            }
    
            @Override
            public Histogram getHistogram(String name) {
                throw new UnsupportedOperationException("Histograms are not supported in rich async functions.");
            }
    
            @Override
            public boolean hasBroadcastVariable(String name) {
                throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
            }
    
            @Override
            public <RT> List<RT> getBroadcastVariable(String name) {
                throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
            }
    
            @Override
            public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
                throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
            }
        }
    
    • RichAsyncFunctionRuntimeContext实现了RuntimeContext接口,它将一些方法代理给RuntimeContext,其余的Unsupported的方法都覆盖抛出UnsupportedOperationException

    RichAsyncFunctionIterationRuntimeContext

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java

        private static class RichAsyncFunctionIterationRuntimeContext extends RichAsyncFunctionRuntimeContext implements IterationRuntimeContext {
    
            private final IterationRuntimeContext iterationRuntimeContext;
    
            RichAsyncFunctionIterationRuntimeContext(IterationRuntimeContext iterationRuntimeContext) {
                super(iterationRuntimeContext);
    
                this.iterationRuntimeContext = Preconditions.checkNotNull(iterationRuntimeContext);
            }
    
            @Override
            public int getSuperstepNumber() {
                return iterationRuntimeContext.getSuperstepNumber();
            }
    
            // -----------------------------------------------------------------------------------
            // Unsupported operations
            // -----------------------------------------------------------------------------------
    
            @Override
            public <T extends Aggregator<?>> T getIterationAggregator(String name) {
                throw new UnsupportedOperationException("Iteration aggregators are not supported in rich async functions.");
            }
    
            @Override
            public <T extends Value> T getPreviousIterationAggregate(String name) {
                throw new UnsupportedOperationException("Iteration aggregators are not supported in rich async functions.");
            }
        }
    
    • RichAsyncFunctionIterationRuntimeContext继承了RichAsyncFunctionRuntimeContext,实现了IterationRuntimeContext接口,它将getSuperstepNumber方法交由IterationRuntimeContext处理,然后覆盖getIterationAggregator、getPreviousIterationAggregate方法抛出UnsupportedOperationException

    AsyncDataStream

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/AsyncDataStream.java

    @PublicEvolving
    public class AsyncDataStream {
    
        /**
         * Output mode for asynchronous operations.
         */
        public enum OutputMode { ORDERED, UNORDERED }
    
        private static final int DEFAULT_QUEUE_CAPACITY = 100;
    
        private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
                DataStream<IN> in,
                AsyncFunction<IN, OUT> func,
                long timeout,
                int bufSize,
                OutputMode mode) {
    
            TypeInformation<OUT> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
                func,
                AsyncFunction.class,
                0,
                1,
                new int[]{1, 0},
                in.getType(),
                Utils.getCallLocationName(),
                true);
    
            // create transform
            AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(
                in.getExecutionEnvironment().clean(func),
                timeout,
                bufSize,
                mode);
    
            return in.transform("async wait operator", outTypeInfo, operator);
        }
    
        public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
                DataStream<IN> in,
                AsyncFunction<IN, OUT> func,
                long timeout,
                TimeUnit timeUnit,
                int capacity) {
            return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED);
        }
    
        public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
                DataStream<IN> in,
                AsyncFunction<IN, OUT> func,
                long timeout,
                TimeUnit timeUnit) {
            return addOperator(
                in,
                func,
                timeUnit.toMillis(timeout),
                DEFAULT_QUEUE_CAPACITY,
                OutputMode.UNORDERED);
        }
    
        public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
                DataStream<IN> in,
                AsyncFunction<IN, OUT> func,
                long timeout,
                TimeUnit timeUnit,
                int capacity) {
            return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);
        }
    
        public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
                DataStream<IN> in,
                AsyncFunction<IN, OUT> func,
                long timeout,
                TimeUnit timeUnit) {
            return addOperator(
                in,
                func,
                timeUnit.toMillis(timeout),
                DEFAULT_QUEUE_CAPACITY,
                OutputMode.ORDERED);
        }
    }
    
    • AsyncDataStream提供了unorderedWait、orderedWait两类方法来将AsyncFunction作用于DataStream
    • unorderedWait、orderedWait方法有带capacity参数的也有不带capacity参数的,不带capacity参数即默认使用DEFAULT_QUEUE_CAPACITY,即100;这些方法最后都是调用addOperator私有方法来实现,它使用的是AsyncWaitOperator;unorderedWait、orderedWait方法都带了timeout参数,用于指定等待async操作完成的超时时间
    • AsyncDataStream提供了两种OutputMode,其中UNORDERED是无序的,即一旦async操作完成就emit结果,当使用TimeCharacteristic.ProcessingTime的时候这种模式延迟最低、负载最低;ORDERED是有序的,即按element的输入顺序emit结果,为了保证有序operator需要缓冲数据,因而会造成一定的延迟及负载

    小结

    • flink给外部数据访问提供了Asynchronous I/O的API,用于提升streaming的吞吐量,其基本使用就是定义一个实现AsyncFunction接口的function,然后使用AsyncDataStream的unorderedWait或orderedWait方法将AsyncFunction作用到DataStream作为transformation
    • AsyncFunction接口继承了Function,它定义了asyncInvoke方法以及一个default的timeout方法;asyncInvoke方法执行异步逻辑,然后通过ResultFuture.complete将结果或异常设置到ResultFuture,如果异常则通过ResultFuture.completeExceptionally(Throwable)来传递到ResultFuture;RichAsyncFunction继承了AbstractRichFunction,同时声明实现AsyncFunction接口,它不没有实现asyncInvoke,交由子类实现;它覆盖了setRuntimeContext方法,这里使用RichAsyncFunctionRuntimeContext或者RichAsyncFunctionIterationRuntimeContext进行包装
    • AsyncDataStream的unorderedWait或orderedWait有两个关于async operation的参数,一个是timeout参数用于设置async的超时时间,一个是capacity参数用于指定同一时刻最大允许多少个(并发)async request在执行;AsyncDataStream提供了两种OutputMode,其中UNORDERED是无序的,即一旦async操作完成就emit结果,当使用TimeCharacteristic.ProcessingTime的时候这种模式延迟最低、负载最低;ORDERED是有序的,即按element的输入顺序emit结果,为了保证有序operator需要缓冲数据,因而会造成一定的延迟及负载

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的Async I/O

          本文链接:https://www.haomeiwen.com/subject/erwadqtx.html