美文网首页
聊聊flink的AsyncWaitOperator

聊聊flink的AsyncWaitOperator

作者: go4it | 来源:发表于2019-01-20 11:32 被阅读9次

    本文主要研究一下flink的AsyncWaitOperator

    AsyncWaitOperator

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java

    @Internal
    public class AsyncWaitOperator<IN, OUT>
            extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
            implements OneInputStreamOperator<IN, OUT>, OperatorActions {
        private static final long serialVersionUID = 1L;
    
        private static final String STATE_NAME = "_async_wait_operator_state_";
    
        /** Capacity of the stream element queue. */
        private final int capacity;
    
        /** Output mode for this operator. */
        private final AsyncDataStream.OutputMode outputMode;
    
        /** Timeout for the async collectors. */
        private final long timeout;
    
        protected transient Object checkpointingLock;
    
        /** {@link TypeSerializer} for inputs while making snapshots. */
        private transient StreamElementSerializer<IN> inStreamElementSerializer;
    
        /** Recovered input stream elements. */
        private transient ListState<StreamElement> recoveredStreamElements;
    
        /** Queue to store the currently in-flight stream elements into. */
        private transient StreamElementQueue queue;
    
        /** Pending stream element which could not yet added to the queue. */
        private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;
    
        private transient ExecutorService executor;
    
        /** Emitter for the completed stream element queue entries. */
        private transient Emitter<OUT> emitter;
    
        /** Thread running the emitter. */
        private transient Thread emitterThread;
    
        public AsyncWaitOperator(
                AsyncFunction<IN, OUT> asyncFunction,
                long timeout,
                int capacity,
                AsyncDataStream.OutputMode outputMode) {
            super(asyncFunction);
            chainingStrategy = ChainingStrategy.ALWAYS;
    
            Preconditions.checkArgument(capacity > 0, "The number of concurrent async operation should be greater than 0.");
            this.capacity = capacity;
    
            this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");
    
            this.timeout = timeout;
        }
    
        @Override
        public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
            super.setup(containingTask, config, output);
    
            this.checkpointingLock = getContainingTask().getCheckpointLock();
    
            this.inStreamElementSerializer = new StreamElementSerializer<>(
                getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));
    
            // create the operators executor for the complete operations of the queue entries
            this.executor = Executors.newSingleThreadExecutor();
    
            switch (outputMode) {
                case ORDERED:
                    queue = new OrderedStreamElementQueue(
                        capacity,
                        executor,
                        this);
                    break;
                case UNORDERED:
                    queue = new UnorderedStreamElementQueue(
                        capacity,
                        executor,
                        this);
                    break;
                default:
                    throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
            }
        }
    
        @Override
        public void open() throws Exception {
            super.open();
    
            // create the emitter
            this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
    
            // start the emitter thread
            this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
            emitterThread.setDaemon(true);
            emitterThread.start();
    
            // process stream elements from state, since the Emit thread will start as soon as all
            // elements from previous state are in the StreamElementQueue, we have to make sure that the
            // order to open all operators in the operator chain proceeds from the tail operator to the
            // head operator.
            if (recoveredStreamElements != null) {
                for (StreamElement element : recoveredStreamElements.get()) {
                    if (element.isRecord()) {
                        processElement(element.<IN>asRecord());
                    }
                    else if (element.isWatermark()) {
                        processWatermark(element.asWatermark());
                    }
                    else if (element.isLatencyMarker()) {
                        processLatencyMarker(element.asLatencyMarker());
                    }
                    else {
                        throw new IllegalStateException("Unknown record type " + element.getClass() +
                            " encountered while opening the operator.");
                    }
                }
                recoveredStreamElements = null;
            }
    
        }
    
        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
            final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);
    
            if (timeout > 0L) {
                // register a timeout for this AsyncStreamRecordBufferEntry
                long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
    
                final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
                    timeoutTimestamp,
                    new ProcessingTimeCallback() {
                        @Override
                        public void onProcessingTime(long timestamp) throws Exception {
                            userFunction.timeout(element.getValue(), streamRecordBufferEntry);
                        }
                    });
    
                // Cancel the timer once we've completed the stream record buffer entry. This will remove
                // the register trigger task
                streamRecordBufferEntry.onComplete(
                    (StreamElementQueueEntry<Collection<OUT>> value) -> {
                        timerFuture.cancel(true);
                    },
                    executor);
            }
    
            addAsyncBufferEntry(streamRecordBufferEntry);
    
            userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
        }
    
        @Override
        public void processWatermark(Watermark mark) throws Exception {
            WatermarkQueueEntry watermarkBufferEntry = new WatermarkQueueEntry(mark);
    
            addAsyncBufferEntry(watermarkBufferEntry);
        }
    
        @Override
        public void snapshotState(StateSnapshotContext context) throws Exception {
            super.snapshotState(context);
    
            ListState<StreamElement> partitionableState =
                getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
            partitionableState.clear();
    
            Collection<StreamElementQueueEntry<?>> values = queue.values();
    
            try {
                for (StreamElementQueueEntry<?> value : values) {
                    partitionableState.add(value.getStreamElement());
                }
    
                // add the pending stream element queue entry if the stream element queue is currently full
                if (pendingStreamElementQueueEntry != null) {
                    partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
                }
            } catch (Exception e) {
                partitionableState.clear();
    
                throw new Exception("Could not add stream element queue entries to operator state " +
                    "backend of operator " + getOperatorName() + '.', e);
            }
        }
    
        @Override
        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            recoveredStreamElements = context
                .getOperatorStateStore()
                .getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
    
        }
    
        @Override
        public void close() throws Exception {
            try {
                assert(Thread.holdsLock(checkpointingLock));
    
                while (!queue.isEmpty()) {
                    // wait for the emitter thread to output the remaining elements
                    // for that he needs the checkpointing lock and thus we have to free it
                    checkpointingLock.wait();
                }
            }
            finally {
                Exception exception = null;
    
                try {
                    super.close();
                } catch (InterruptedException interrupted) {
                    exception = interrupted;
    
                    Thread.currentThread().interrupt();
                } catch (Exception e) {
                    exception = e;
                }
    
                try {
                    // terminate the emitter, the emitter thread and the executor
                    stopResources(true);
                } catch (InterruptedException interrupted) {
                    exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);
    
                    Thread.currentThread().interrupt();
                } catch (Exception e) {
                    exception = ExceptionUtils.firstOrSuppressed(e, exception);
                }
    
                if (exception != null) {
                    LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception);
                }
            }
        }
    
        @Override
        public void dispose() throws Exception {
            Exception exception = null;
    
            try {
                super.dispose();
            } catch (InterruptedException interrupted) {
                exception = interrupted;
    
                Thread.currentThread().interrupt();
            } catch (Exception e) {
                exception = e;
            }
    
            try {
                stopResources(false);
            } catch (InterruptedException interrupted) {
                exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);
    
                Thread.currentThread().interrupt();
            } catch (Exception e) {
                exception = ExceptionUtils.firstOrSuppressed(e, exception);
            }
    
            if (exception != null) {
                throw exception;
            }
        }
    
        private void stopResources(boolean waitForShutdown) throws InterruptedException {
            emitter.stop();
            emitterThread.interrupt();
    
            executor.shutdown();
    
            if (waitForShutdown) {
                try {
                    if (!executor.awaitTermination(365L, TimeUnit.DAYS)) {
                        executor.shutdownNow();
                    }
                } catch (InterruptedException e) {
                    executor.shutdownNow();
    
                    Thread.currentThread().interrupt();
                }
    
                /*
                 * FLINK-5638: If we have the checkpoint lock we might have to free it for a while so
                 * that the emitter thread can complete/react to the interrupt signal.
                 */
                if (Thread.holdsLock(checkpointingLock)) {
                    while (emitterThread.isAlive()) {
                        checkpointingLock.wait(100L);
                    }
                }
    
                emitterThread.join();
            } else {
                executor.shutdownNow();
            }
        }
    
        private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
            assert(Thread.holdsLock(checkpointingLock));
    
            pendingStreamElementQueueEntry = streamElementQueueEntry;
    
            while (!queue.tryPut(streamElementQueueEntry)) {
                // we wait for the emitter to notify us if the queue has space left again
                checkpointingLock.wait();
            }
    
            pendingStreamElementQueueEntry = null;
        }
    
        @Override
        public void failOperator(Throwable throwable) {
            getContainingTask().getEnvironment().failExternally(throwable);
        }
    }
    
    • AsyncWaitOperator继承了AbstractUdfStreamOperator,覆盖了AbstractUdfStreamOperator的setup、open、initializeState、close、dispose方法;实现了OneInputStreamOperator接口定义的processElement、processWatermark、processLatencyMarker方法;实现了OperatorActions定义的failOperator方法
    • setup方法使用Executors.newSingleThreadExecutor()创建了ExecutorService,之后根据不同的outputMode创建不同的StreamElementQueue(OrderedStreamElementQueue或者UnorderedStreamElementQueue);open方法使用Emitter创建并启动AsyncIO-Emitter-Thread,另外就是处理recoveredStreamElements,根据不同的类型分别调用processElement、processWatermark、processLatencyMarker方法
    • processElement方法首先根据timeout注册一个timer,在ProcessingTimeCallback的onProcessingTime方法里头执行userFunction.timeout,之后将StreamRecordQueueEntry添加到StreamElementQueue中,最后触发userFunction.asyncInvoke;close和dispose方法会调用stopResources方法来关闭资源,不同的是waitForShutdown参数传值不同,close方法传true,而dispose方法传false

    Emitter

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/Emitter.java

    @Internal
    public class Emitter<OUT> implements Runnable {
    
        private static final Logger LOG = LoggerFactory.getLogger(Emitter.class);
    
        /** Lock to hold before outputting. */
        private final Object checkpointLock;
    
        /** Output for the watermark elements. */
        private final Output<StreamRecord<OUT>> output;
    
        /** Queue to consume the async results from. */
        private final StreamElementQueue streamElementQueue;
    
        private final OperatorActions operatorActions;
    
        /** Output for stream records. */
        private final TimestampedCollector<OUT> timestampedCollector;
    
        private volatile boolean running;
    
        public Emitter(
                final Object checkpointLock,
                final Output<StreamRecord<OUT>> output,
                final StreamElementQueue streamElementQueue,
                final OperatorActions operatorActions) {
    
            this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock");
            this.output = Preconditions.checkNotNull(output, "output");
            this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "streamElementQueue");
            this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
    
            this.timestampedCollector = new TimestampedCollector<>(this.output);
            this.running = true;
        }
    
        @Override
        public void run() {
            try {
                while (running) {
                    LOG.debug("Wait for next completed async stream element result.");
                    AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();
    
                    output(streamElementEntry);
                }
            } catch (InterruptedException e) {
                if (running) {
                    operatorActions.failOperator(e);
                } else {
                    // Thread got interrupted which means that it should shut down
                    LOG.debug("Emitter thread got interrupted, shutting down.");
                }
            } catch (Throwable t) {
                operatorActions.failOperator(new Exception("AsyncWaitOperator's emitter caught an " +
                    "unexpected throwable.", t));
            }
        }
    
        private void output(AsyncResult asyncResult) throws InterruptedException {
            if (asyncResult.isWatermark()) {
                synchronized (checkpointLock) {
                    AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();
    
                    LOG.debug("Output async watermark.");
                    output.emitWatermark(asyncWatermarkResult.getWatermark());
    
                    // remove the peeked element from the async collector buffer so that it is no longer
                    // checkpointed
                    streamElementQueue.poll();
    
                    // notify the main thread that there is again space left in the async collector
                    // buffer
                    checkpointLock.notifyAll();
                }
            } else {
                AsyncCollectionResult<OUT> streamRecordResult = asyncResult.asResultCollection();
    
                if (streamRecordResult.hasTimestamp()) {
                    timestampedCollector.setAbsoluteTimestamp(streamRecordResult.getTimestamp());
                } else {
                    timestampedCollector.eraseTimestamp();
                }
    
                synchronized (checkpointLock) {
                    LOG.debug("Output async stream element collection result.");
    
                    try {
                        Collection<OUT> resultCollection = streamRecordResult.get();
    
                        if (resultCollection != null) {
                            for (OUT result : resultCollection) {
                                timestampedCollector.collect(result);
                            }
                        }
                    } catch (Exception e) {
                        operatorActions.failOperator(
                            new Exception("An async function call terminated with an exception. " +
                                "Failing the AsyncWaitOperator.", e));
                    }
    
                    // remove the peeked element from the async collector buffer so that it is no longer
                    // checkpointed
                    streamElementQueue.poll();
    
                    // notify the main thread that there is again space left in the async collector
                    // buffer
                    checkpointLock.notifyAll();
                }
            }
        }
    
        public void stop() {
            running = false;
        }
    }
    
    • Emitter实现了Runnable接口,它主要负责从StreamElementQueue取出element,然后输出到TimestampedCollector
    • Emitter的run方法就是不断循环调用streamElementQueue.peekBlockingly()阻塞获取AsyncResult,获取到之后就调用output方法将result输出出去
    • Emitter的output方法根据asyncResult是否是watermark做不同处理,不是watermark的话,就会将result通过timestampedCollector.collect输出,如果出现异常则调用operatorActions.failOperator传递异常,最后调用streamElementQueue.poll()来移除队首的元素

    StreamElementQueue

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java

    @Internal
    public interface StreamElementQueue {
    
        <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException;
    
        <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException;
    
        AsyncResult peekBlockingly() throws InterruptedException;
    
        AsyncResult poll() throws InterruptedException;
    
        Collection<StreamElementQueueEntry<?>> values() throws InterruptedException;
    
        boolean isEmpty();
    
        int size();
    }
    
    • StreamElementQueue接口主要定义了AsyncWaitOperator所要用的blocking stream element queue的接口;它定义了put、tryPut、peekBlockingly、poll、values、isEmpty、size方法;StreamElementQueue接口有两个子类分别是UnorderedStreamElementQueue及OrderedStreamElementQueue;队列元素类型为StreamElementQueueEntry

    UnorderedStreamElementQueue

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java

    @Internal
    public class UnorderedStreamElementQueue implements StreamElementQueue {
    
        private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);
    
        /** Capacity of this queue. */
        private final int capacity;
    
        /** Executor to run the onComplete callbacks. */
        private final Executor executor;
    
        /** OperatorActions to signal the owning operator a failure. */
        private final OperatorActions operatorActions;
    
        /** Queue of uncompleted stream element queue entries segmented by watermarks. */
        private final ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue;
    
        /** Queue of completed stream element queue entries. */
        private final ArrayDeque<StreamElementQueueEntry<?>> completedQueue;
    
        /** First (chronologically oldest) uncompleted set of stream element queue entries. */
        private Set<StreamElementQueueEntry<?>> firstSet;
    
        // Last (chronologically youngest) uncompleted set of stream element queue entries. New
        // stream element queue entries are inserted into this set.
        private Set<StreamElementQueueEntry<?>> lastSet;
        private volatile int numberEntries;
    
        /** Locks and conditions for the blocking queue. */
        private final ReentrantLock lock;
        private final Condition notFull;
        private final Condition hasCompletedEntries;
    
        public UnorderedStreamElementQueue(
                int capacity,
                Executor executor,
                OperatorActions operatorActions) {
    
            Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");
            this.capacity = capacity;
    
            this.executor = Preconditions.checkNotNull(executor, "executor");
    
            this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
    
            this.uncompletedQueue = new ArrayDeque<>(capacity);
            this.completedQueue = new ArrayDeque<>(capacity);
    
            this.firstSet = new HashSet<>(capacity);
            this.lastSet = firstSet;
    
            this.numberEntries = 0;
    
            this.lock = new ReentrantLock();
            this.notFull = lock.newCondition();
            this.hasCompletedEntries = lock.newCondition();
        }
    
        @Override
        public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
            lock.lockInterruptibly();
    
            try {
                while (numberEntries >= capacity) {
                    notFull.await();
                }
    
                addEntry(streamElementQueueEntry);
            } finally {
                lock.unlock();
            }
        }
    
        @Override
        public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
            lock.lockInterruptibly();
    
            try {
                if (numberEntries < capacity) {
                    addEntry(streamElementQueueEntry);
    
                    LOG.debug("Put element into unordered stream element queue. New filling degree " +
                        "({}/{}).", numberEntries, capacity);
    
                    return true;
                } else {
                    LOG.debug("Failed to put element into unordered stream element queue because it " +
                        "was full ({}/{}).", numberEntries, capacity);
    
                    return false;
                }
            } finally {
                lock.unlock();
            }
        }
    
        @Override
        public AsyncResult peekBlockingly() throws InterruptedException {
            lock.lockInterruptibly();
    
            try {
                while (completedQueue.isEmpty()) {
                    hasCompletedEntries.await();
                }
    
                LOG.debug("Peeked head element from unordered stream element queue with filling degree " +
                    "({}/{}).", numberEntries, capacity);
    
                return completedQueue.peek();
            } finally {
                lock.unlock();
            }
        }
    
        @Override
        public AsyncResult poll() throws InterruptedException {
            lock.lockInterruptibly();
    
            try {
                while (completedQueue.isEmpty()) {
                    hasCompletedEntries.await();
                }
    
                numberEntries--;
                notFull.signalAll();
    
                LOG.debug("Polled element from unordered stream element queue. New filling degree " +
                    "({}/{}).", numberEntries, capacity);
    
                return completedQueue.poll();
            } finally {
                lock.unlock();
            }
        }
    
        @Override
        public Collection<StreamElementQueueEntry<?>> values() throws InterruptedException {
            lock.lockInterruptibly();
    
            try {
                StreamElementQueueEntry<?>[] array = new StreamElementQueueEntry[numberEntries];
    
                array = completedQueue.toArray(array);
    
                int counter = completedQueue.size();
    
                for (StreamElementQueueEntry<?> entry: firstSet) {
                    array[counter] = entry;
                    counter++;
                }
    
                for (Set<StreamElementQueueEntry<?>> asyncBufferEntries : uncompletedQueue) {
    
                    for (StreamElementQueueEntry<?> streamElementQueueEntry : asyncBufferEntries) {
                        array[counter] = streamElementQueueEntry;
                        counter++;
                    }
                }
    
                return Arrays.asList(array);
            } finally {
                lock.unlock();
            }
        }
    
        @Override
        public boolean isEmpty() {
            return numberEntries == 0;
        }
    
        @Override
        public int size() {
            return numberEntries;
        }
    
        public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
            lock.lockInterruptibly();
    
            try {
                if (firstSet.remove(streamElementQueueEntry)) {
                    completedQueue.offer(streamElementQueueEntry);
    
                    while (firstSet.isEmpty() && firstSet != lastSet) {
                        firstSet = uncompletedQueue.poll();
    
                        Iterator<StreamElementQueueEntry<?>> it = firstSet.iterator();
    
                        while (it.hasNext()) {
                            StreamElementQueueEntry<?> bufferEntry = it.next();
    
                            if (bufferEntry.isDone()) {
                                completedQueue.offer(bufferEntry);
                                it.remove();
                            }
                        }
                    }
    
                    LOG.debug("Signal unordered stream element queue has completed entries.");
                    hasCompletedEntries.signalAll();
                }
            } finally {
                lock.unlock();
            }
        }
    
        private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
            assert(lock.isHeldByCurrentThread());
    
            if (streamElementQueueEntry.isWatermark()) {
                lastSet = new HashSet<>(capacity);
    
                if (firstSet.isEmpty()) {
                    firstSet.add(streamElementQueueEntry);
                } else {
                    Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1);
                    watermarkSet.add(streamElementQueueEntry);
                    uncompletedQueue.offer(watermarkSet);
                }
                uncompletedQueue.offer(lastSet);
            } else {
                lastSet.add(streamElementQueueEntry);
            }
    
            streamElementQueueEntry.onComplete(
                (StreamElementQueueEntry<T> value) -> {
                    try {
                        onCompleteHandler(value);
                    } catch (InterruptedException e) {
                        // The accept executor thread got interrupted. This is probably cause by
                        // the shutdown of the executor.
                        LOG.debug("AsyncBufferEntry could not be properly completed because the " +
                            "executor thread has been interrupted.", e);
                    } catch (Throwable t) {
                        operatorActions.failOperator(new Exception("Could not complete the " +
                            "stream element queue entry: " + value + '.', t));
                    }
                },
                executor);
    
            numberEntries++;
        }
    }
    
    • UnorderedStreamElementQueue实现了StreamElementQueue接口,它emit结果的顺序是无序的,其内部使用了两个ArrayDeque,一个是uncompletedQueue,一个是completedQueue
    • peekBlockingly方法首先判断completedQueue是否有元素,没有的话则执行hasCompletedEntries.await(),有则执行completedQueue.peek();put及tryPut都会调用addEntry方法,该方法会往uncompletedQueue队列新增元素,然后同时给每个streamElementQueueEntry的onComplete方法注册一个onCompleteHandler
    • onCompleteHandler方法会将执行完成的streamElementQueueEntry从uncompletedQueue移除,然后添加到completedQueue

    OrderedStreamElementQueue

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java

    @Internal
    public class OrderedStreamElementQueue implements StreamElementQueue {
    
        private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);
    
        /** Capacity of this queue. */
        private final int capacity;
    
        /** Executor to run the onCompletion callback. */
        private final Executor executor;
    
        /** Operator actions to signal a failure to the operator. */
        private final OperatorActions operatorActions;
    
        /** Lock and conditions for the blocking queue. */
        private final ReentrantLock lock;
        private final Condition notFull;
        private final Condition headIsCompleted;
    
        /** Queue for the inserted StreamElementQueueEntries. */
        private final ArrayDeque<StreamElementQueueEntry<?>> queue;
    
        public OrderedStreamElementQueue(
                int capacity,
                Executor executor,
                OperatorActions operatorActions) {
    
            Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");
            this.capacity = capacity;
    
            this.executor = Preconditions.checkNotNull(executor, "executor");
    
            this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
    
            this.lock = new ReentrantLock(false);
            this.headIsCompleted = lock.newCondition();
            this.notFull = lock.newCondition();
    
            this.queue = new ArrayDeque<>(capacity);
        }
    
        @Override
        public AsyncResult peekBlockingly() throws InterruptedException {
            lock.lockInterruptibly();
    
            try {
                while (queue.isEmpty() || !queue.peek().isDone()) {
                    headIsCompleted.await();
                }
    
                LOG.debug("Peeked head element from ordered stream element queue with filling degree " +
                    "({}/{}).", queue.size(), capacity);
    
                return queue.peek();
            } finally {
                lock.unlock();
            }
        }
    
        @Override
        public AsyncResult poll() throws InterruptedException {
            lock.lockInterruptibly();
    
            try {
                while (queue.isEmpty() || !queue.peek().isDone()) {
                    headIsCompleted.await();
                }
    
                notFull.signalAll();
    
                LOG.debug("Polled head element from ordered stream element queue. New filling degree " +
                    "({}/{}).", queue.size() - 1, capacity);
    
                return queue.poll();
            } finally {
                lock.unlock();
            }
        }
    
        @Override
        public Collection<StreamElementQueueEntry<?>> values() throws InterruptedException {
            lock.lockInterruptibly();
    
            try {
                StreamElementQueueEntry<?>[] array = new StreamElementQueueEntry[queue.size()];
    
                array = queue.toArray(array);
    
                return Arrays.asList(array);
            } finally {
                lock.unlock();
            }
        }
    
        @Override
        public boolean isEmpty() {
            return queue.isEmpty();
        }
    
        @Override
        public int size() {
            return queue.size();
        }
    
        @Override
        public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
            lock.lockInterruptibly();
    
            try {
                while (queue.size() >= capacity) {
                    notFull.await();
                }
    
                addEntry(streamElementQueueEntry);
            } finally {
                lock.unlock();
            }
        }
    
        @Override
        public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
            lock.lockInterruptibly();
    
            try {
                if (queue.size() < capacity) {
                    addEntry(streamElementQueueEntry);
    
                    LOG.debug("Put element into ordered stream element queue. New filling degree " +
                        "({}/{}).", queue.size(), capacity);
    
                    return true;
                } else {
                    LOG.debug("Failed to put element into ordered stream element queue because it " +
                        "was full ({}/{}).", queue.size(), capacity);
    
                    return false;
                }
            } finally {
                lock.unlock();
            }
        }
    
        private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
            assert(lock.isHeldByCurrentThread());
    
            queue.addLast(streamElementQueueEntry);
    
            streamElementQueueEntry.onComplete(
                (StreamElementQueueEntry<T> value) -> {
                    try {
                        onCompleteHandler(value);
                    } catch (InterruptedException e) {
                        // we got interrupted. This indicates a shutdown of the executor
                        LOG.debug("AsyncBufferEntry could not be properly completed because the " +
                            "executor thread has been interrupted.", e);
                    } catch (Throwable t) {
                        operatorActions.failOperator(new Exception("Could not complete the " +
                            "stream element queue entry: " + value + '.', t));
                    }
                },
                executor);
        }
    
        private void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
            lock.lockInterruptibly();
    
            try {
                if (!queue.isEmpty() && queue.peek().isDone()) {
                    LOG.debug("Signal ordered stream element queue has completed head element.");
                    headIsCompleted.signalAll();
                }
            } finally {
                lock.unlock();
            }
        }
    }
    
    • OrderedStreamElementQueue实现了StreamElementQueue接口,它有序地emit结果,它内部有一个ArrayDeque类型的queue
    • peekBlockingly方法首先判断queue是否有元素而且是执行完成的,没有就执行headIsCompleted.await(),有则执行queue.peek();put及tryPut都会调用addEntry方法,该方法会执行queue.addLast(streamElementQueueEntry),然后同时给每个streamElementQueueEntry的onComplete方法注册一个onCompleteHandler
    • onCompleteHandler方法会检测执行完成的元素是否是队列的第一个元素,如果是则执行headIsCompleted.signalAll()

    AsyncResult

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java

    @Internal
    public interface AsyncResult {
    
        boolean isWatermark();
    
        boolean isResultCollection();
    
        AsyncWatermarkResult asWatermark();
    
        <T> AsyncCollectionResult<T> asResultCollection();
    }
    
    • AsyncResult接口定义了StreamElementQueue的元素异步返回的结果要实现的方法,该async result可能是watermark,可能是真正的结果

    StreamElementQueueEntry

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java

    @Internal
    public abstract class StreamElementQueueEntry<T> implements AsyncResult {
    
        private final StreamElement streamElement;
    
        public StreamElementQueueEntry(StreamElement streamElement) {
            this.streamElement = Preconditions.checkNotNull(streamElement);
        }
    
        public StreamElement getStreamElement() {
            return streamElement;
        }
    
        public boolean isDone() {
            return getFuture().isDone();
        }
    
        public void onComplete(
                final Consumer<StreamElementQueueEntry<T>> completeFunction,
                Executor executor) {
            final StreamElementQueueEntry<T> thisReference = this;
    
            getFuture().whenCompleteAsync(
                // call the complete function for normal completion as well as exceptional completion
                // see FLINK-6435
                (value, throwable) -> completeFunction.accept(thisReference),
                executor);
        }
    
        protected abstract CompletableFuture<T> getFuture();
    
        @Override
        public final boolean isWatermark() {
            return AsyncWatermarkResult.class.isAssignableFrom(getClass());
        }
    
        @Override
        public final boolean isResultCollection() {
            return AsyncCollectionResult.class.isAssignableFrom(getClass());
        }
    
        @Override
        public final AsyncWatermarkResult asWatermark() {
            return (AsyncWatermarkResult) this;
        }
    
        @Override
        public final <T> AsyncCollectionResult<T> asResultCollection() {
            return (AsyncCollectionResult<T>) this;
        }
    }
    
    • StreamElementQueueEntry实现了AsyncResult接口,它定义了onComplete方法用于结果完成时的回调处理,同时它还定义了抽象方法getFuture供子类实现;它有两个子类,分别是WatermarkQueueEntry及StreamRecordQueueEntry

    WatermarkQueueEntry

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java

    @Internal
    public class WatermarkQueueEntry extends StreamElementQueueEntry<Watermark> implements AsyncWatermarkResult {
    
        private final CompletableFuture<Watermark> future;
    
        public WatermarkQueueEntry(Watermark watermark) {
            super(watermark);
    
            this.future = CompletableFuture.completedFuture(watermark);
        }
    
        @Override
        public Watermark getWatermark() {
            return (Watermark) getStreamElement();
        }
    
        @Override
        protected CompletableFuture<Watermark> getFuture() {
            return future;
        }
    }
    
    • WatermarkQueueEntry继承了StreamElementQueueEntry,其元素类型为Watermark,同时实现了AsyncWatermarkResult接口

    StreamRecordQueueEntry

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java

    @Internal
    public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collection<OUT>>
        implements AsyncCollectionResult<OUT>, ResultFuture<OUT> {
    
        /** Timestamp information. */
        private final boolean hasTimestamp;
        private final long timestamp;
    
        /** Future containing the collection result. */
        private final CompletableFuture<Collection<OUT>> resultFuture;
    
        public StreamRecordQueueEntry(StreamRecord<?> streamRecord) {
            super(streamRecord);
    
            hasTimestamp = streamRecord.hasTimestamp();
            timestamp = streamRecord.getTimestamp();
    
            resultFuture = new CompletableFuture<>();
        }
    
        @Override
        public boolean hasTimestamp() {
            return hasTimestamp;
        }
    
        @Override
        public long getTimestamp() {
            return timestamp;
        }
    
        @Override
        public Collection<OUT> get() throws Exception {
            return resultFuture.get();
        }
    
        @Override
        protected CompletableFuture<Collection<OUT>> getFuture() {
            return resultFuture;
        }
    
        @Override
        public void complete(Collection<OUT> result) {
            resultFuture.complete(result);
        }
    
        @Override
        public void completeExceptionally(Throwable error) {
            resultFuture.completeExceptionally(error);
        }
    }
    
    • StreamRecordQueueEntry继承了StreamElementQueueEntry,同时实现了AsyncCollectionResult、ResultFuture接口

    小结

    • AsyncWaitOperator继承了AbstractUdfStreamOperator,覆盖了AbstractUdfStreamOperator的setup、open、initializeState、close、dispose方法;实现了OneInputStreamOperator接口定义的processElement、processWatermark、processLatencyMarker方法;实现了OperatorActions定义的failOperator方法;open方法使用Emitter创建并启动AsyncIO-Emitter-Thread
    • Emitter实现了Runnable接口,它主要负责从StreamElementQueue取出element,然后输出到TimestampedCollector;其run方法就是不断循环调用streamElementQueue.peekBlockingly()阻塞获取AsyncResult,获取到之后就调用output方法将result输出出去
    • StreamElementQueue接口主要定义了AsyncWaitOperator所要用的blocking stream element queue的接口;它定义了put、tryPut、peekBlockingly、poll、values、isEmpty、size方法;StreamElementQueue接口有两个子类分别是UnorderedStreamElementQueue及OrderedStreamElementQueue;队列元素类型为StreamElementQueueEntry,StreamElementQueueEntry实现了AsyncResult接口,它定义了onComplete方法用于结果完成时的回调处理,同时它还定义了抽象方法getFuture供子类实现;它有两个子类,分别是WatermarkQueueEntry及StreamRecordQueueEntry

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的AsyncWaitOperator

          本文链接:https://www.haomeiwen.com/subject/aqlxjqtx.html