序
本文主要研究一下flink的AsyncWaitOperator
AsyncWaitOperator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@Internal
public class AsyncWaitOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, OperatorActions {
private static final long serialVersionUID = 1L;
private static final String STATE_NAME = "_async_wait_operator_state_";
/** Capacity of the stream element queue. */
private final int capacity;
/** Output mode for this operator. */
private final AsyncDataStream.OutputMode outputMode;
/** Timeout for the async collectors. */
private final long timeout;
protected transient Object checkpointingLock;
/** {@link TypeSerializer} for inputs while making snapshots. */
private transient StreamElementSerializer<IN> inStreamElementSerializer;
/** Recovered input stream elements. */
private transient ListState<StreamElement> recoveredStreamElements;
/** Queue to store the currently in-flight stream elements into. */
private transient StreamElementQueue queue;
/** Pending stream element which could not yet added to the queue. */
private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;
private transient ExecutorService executor;
/** Emitter for the completed stream element queue entries. */
private transient Emitter<OUT> emitter;
/** Thread running the emitter. */
private transient Thread emitterThread;
public AsyncWaitOperator(
AsyncFunction<IN, OUT> asyncFunction,
long timeout,
int capacity,
AsyncDataStream.OutputMode outputMode) {
super(asyncFunction);
chainingStrategy = ChainingStrategy.ALWAYS;
Preconditions.checkArgument(capacity > 0, "The number of concurrent async operation should be greater than 0.");
this.capacity = capacity;
this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");
this.timeout = timeout;
}
@Override
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
this.checkpointingLock = getContainingTask().getCheckpointLock();
this.inStreamElementSerializer = new StreamElementSerializer<>(
getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));
// create the operators executor for the complete operations of the queue entries
this.executor = Executors.newSingleThreadExecutor();
switch (outputMode) {
case ORDERED:
queue = new OrderedStreamElementQueue(
capacity,
executor,
this);
break;
case UNORDERED:
queue = new UnorderedStreamElementQueue(
capacity,
executor,
this);
break;
default:
throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
}
}
@Override
public void open() throws Exception {
super.open();
// create the emitter
this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
// start the emitter thread
this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
emitterThread.setDaemon(true);
emitterThread.start();
// process stream elements from state, since the Emit thread will start as soon as all
// elements from previous state are in the StreamElementQueue, we have to make sure that the
// order to open all operators in the operator chain proceeds from the tail operator to the
// head operator.
if (recoveredStreamElements != null) {
for (StreamElement element : recoveredStreamElements.get()) {
if (element.isRecord()) {
processElement(element.<IN>asRecord());
}
else if (element.isWatermark()) {
processWatermark(element.asWatermark());
}
else if (element.isLatencyMarker()) {
processLatencyMarker(element.asLatencyMarker());
}
else {
throw new IllegalStateException("Unknown record type " + element.getClass() +
" encountered while opening the operator.");
}
}
recoveredStreamElements = null;
}
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);
if (timeout > 0L) {
// register a timeout for this AsyncStreamRecordBufferEntry
long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
timeoutTimestamp,
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
userFunction.timeout(element.getValue(), streamRecordBufferEntry);
}
});
// Cancel the timer once we've completed the stream record buffer entry. This will remove
// the register trigger task
streamRecordBufferEntry.onComplete(
(StreamElementQueueEntry<Collection<OUT>> value) -> {
timerFuture.cancel(true);
},
executor);
}
addAsyncBufferEntry(streamRecordBufferEntry);
userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
}
@Override
public void processWatermark(Watermark mark) throws Exception {
WatermarkQueueEntry watermarkBufferEntry = new WatermarkQueueEntry(mark);
addAsyncBufferEntry(watermarkBufferEntry);
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
ListState<StreamElement> partitionableState =
getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
partitionableState.clear();
Collection<StreamElementQueueEntry<?>> values = queue.values();
try {
for (StreamElementQueueEntry<?> value : values) {
partitionableState.add(value.getStreamElement());
}
// add the pending stream element queue entry if the stream element queue is currently full
if (pendingStreamElementQueueEntry != null) {
partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
}
} catch (Exception e) {
partitionableState.clear();
throw new Exception("Could not add stream element queue entries to operator state " +
"backend of operator " + getOperatorName() + '.', e);
}
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
recoveredStreamElements = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
}
@Override
public void close() throws Exception {
try {
assert(Thread.holdsLock(checkpointingLock));
while (!queue.isEmpty()) {
// wait for the emitter thread to output the remaining elements
// for that he needs the checkpointing lock and thus we have to free it
checkpointingLock.wait();
}
}
finally {
Exception exception = null;
try {
super.close();
} catch (InterruptedException interrupted) {
exception = interrupted;
Thread.currentThread().interrupt();
} catch (Exception e) {
exception = e;
}
try {
// terminate the emitter, the emitter thread and the executor
stopResources(true);
} catch (InterruptedException interrupted) {
exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);
Thread.currentThread().interrupt();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
if (exception != null) {
LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception);
}
}
}
@Override
public void dispose() throws Exception {
Exception exception = null;
try {
super.dispose();
} catch (InterruptedException interrupted) {
exception = interrupted;
Thread.currentThread().interrupt();
} catch (Exception e) {
exception = e;
}
try {
stopResources(false);
} catch (InterruptedException interrupted) {
exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);
Thread.currentThread().interrupt();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
if (exception != null) {
throw exception;
}
}
private void stopResources(boolean waitForShutdown) throws InterruptedException {
emitter.stop();
emitterThread.interrupt();
executor.shutdown();
if (waitForShutdown) {
try {
if (!executor.awaitTermination(365L, TimeUnit.DAYS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
/*
* FLINK-5638: If we have the checkpoint lock we might have to free it for a while so
* that the emitter thread can complete/react to the interrupt signal.
*/
if (Thread.holdsLock(checkpointingLock)) {
while (emitterThread.isAlive()) {
checkpointingLock.wait(100L);
}
}
emitterThread.join();
} else {
executor.shutdownNow();
}
}
private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
assert(Thread.holdsLock(checkpointingLock));
pendingStreamElementQueueEntry = streamElementQueueEntry;
while (!queue.tryPut(streamElementQueueEntry)) {
// we wait for the emitter to notify us if the queue has space left again
checkpointingLock.wait();
}
pendingStreamElementQueueEntry = null;
}
@Override
public void failOperator(Throwable throwable) {
getContainingTask().getEnvironment().failExternally(throwable);
}
}
- AsyncWaitOperator继承了AbstractUdfStreamOperator,覆盖了AbstractUdfStreamOperator的setup、open、initializeState、close、dispose方法;实现了OneInputStreamOperator接口定义的processElement、processWatermark、processLatencyMarker方法;实现了OperatorActions定义的failOperator方法
- setup方法使用Executors.newSingleThreadExecutor()创建了ExecutorService,之后根据不同的outputMode创建不同的StreamElementQueue(
OrderedStreamElementQueue或者UnorderedStreamElementQueue
);open方法使用Emitter创建并启动AsyncIO-Emitter-Thread,另外就是处理recoveredStreamElements,根据不同的类型分别调用processElement、processWatermark、processLatencyMarker方法 - processElement方法首先根据timeout注册一个timer,在ProcessingTimeCallback的onProcessingTime方法里头执行userFunction.timeout,之后将StreamRecordQueueEntry添加到StreamElementQueue中,最后触发userFunction.asyncInvoke;close和dispose方法会调用stopResources方法来关闭资源,不同的是waitForShutdown参数传值不同,close方法传true,而dispose方法传false
Emitter
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/Emitter.java
@Internal
public class Emitter<OUT> implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(Emitter.class);
/** Lock to hold before outputting. */
private final Object checkpointLock;
/** Output for the watermark elements. */
private final Output<StreamRecord<OUT>> output;
/** Queue to consume the async results from. */
private final StreamElementQueue streamElementQueue;
private final OperatorActions operatorActions;
/** Output for stream records. */
private final TimestampedCollector<OUT> timestampedCollector;
private volatile boolean running;
public Emitter(
final Object checkpointLock,
final Output<StreamRecord<OUT>> output,
final StreamElementQueue streamElementQueue,
final OperatorActions operatorActions) {
this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock");
this.output = Preconditions.checkNotNull(output, "output");
this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "streamElementQueue");
this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
this.timestampedCollector = new TimestampedCollector<>(this.output);
this.running = true;
}
@Override
public void run() {
try {
while (running) {
LOG.debug("Wait for next completed async stream element result.");
AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();
output(streamElementEntry);
}
} catch (InterruptedException e) {
if (running) {
operatorActions.failOperator(e);
} else {
// Thread got interrupted which means that it should shut down
LOG.debug("Emitter thread got interrupted, shutting down.");
}
} catch (Throwable t) {
operatorActions.failOperator(new Exception("AsyncWaitOperator's emitter caught an " +
"unexpected throwable.", t));
}
}
private void output(AsyncResult asyncResult) throws InterruptedException {
if (asyncResult.isWatermark()) {
synchronized (checkpointLock) {
AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();
LOG.debug("Output async watermark.");
output.emitWatermark(asyncWatermarkResult.getWatermark());
// remove the peeked element from the async collector buffer so that it is no longer
// checkpointed
streamElementQueue.poll();
// notify the main thread that there is again space left in the async collector
// buffer
checkpointLock.notifyAll();
}
} else {
AsyncCollectionResult<OUT> streamRecordResult = asyncResult.asResultCollection();
if (streamRecordResult.hasTimestamp()) {
timestampedCollector.setAbsoluteTimestamp(streamRecordResult.getTimestamp());
} else {
timestampedCollector.eraseTimestamp();
}
synchronized (checkpointLock) {
LOG.debug("Output async stream element collection result.");
try {
Collection<OUT> resultCollection = streamRecordResult.get();
if (resultCollection != null) {
for (OUT result : resultCollection) {
timestampedCollector.collect(result);
}
}
} catch (Exception e) {
operatorActions.failOperator(
new Exception("An async function call terminated with an exception. " +
"Failing the AsyncWaitOperator.", e));
}
// remove the peeked element from the async collector buffer so that it is no longer
// checkpointed
streamElementQueue.poll();
// notify the main thread that there is again space left in the async collector
// buffer
checkpointLock.notifyAll();
}
}
}
public void stop() {
running = false;
}
}
- Emitter实现了Runnable接口,它主要负责从StreamElementQueue取出element,然后输出到TimestampedCollector
- Emitter的run方法就是不断循环调用streamElementQueue.peekBlockingly()阻塞获取AsyncResult,获取到之后就调用output方法将result输出出去
- Emitter的output方法根据asyncResult是否是watermark做不同处理,不是watermark的话,就会将result通过timestampedCollector.collect输出,如果出现异常则调用operatorActions.failOperator传递异常,最后调用streamElementQueue.poll()来移除队首的元素
StreamElementQueue
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
@Internal
public interface StreamElementQueue {
<T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException;
<T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException;
AsyncResult peekBlockingly() throws InterruptedException;
AsyncResult poll() throws InterruptedException;
Collection<StreamElementQueueEntry<?>> values() throws InterruptedException;
boolean isEmpty();
int size();
}
- StreamElementQueue接口主要定义了AsyncWaitOperator所要用的blocking stream element queue的接口;它定义了put、tryPut、peekBlockingly、poll、values、isEmpty、size方法;StreamElementQueue接口有两个子类分别是UnorderedStreamElementQueue及OrderedStreamElementQueue;队列元素类型为StreamElementQueueEntry
UnorderedStreamElementQueue
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
@Internal
public class UnorderedStreamElementQueue implements StreamElementQueue {
private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);
/** Capacity of this queue. */
private final int capacity;
/** Executor to run the onComplete callbacks. */
private final Executor executor;
/** OperatorActions to signal the owning operator a failure. */
private final OperatorActions operatorActions;
/** Queue of uncompleted stream element queue entries segmented by watermarks. */
private final ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue;
/** Queue of completed stream element queue entries. */
private final ArrayDeque<StreamElementQueueEntry<?>> completedQueue;
/** First (chronologically oldest) uncompleted set of stream element queue entries. */
private Set<StreamElementQueueEntry<?>> firstSet;
// Last (chronologically youngest) uncompleted set of stream element queue entries. New
// stream element queue entries are inserted into this set.
private Set<StreamElementQueueEntry<?>> lastSet;
private volatile int numberEntries;
/** Locks and conditions for the blocking queue. */
private final ReentrantLock lock;
private final Condition notFull;
private final Condition hasCompletedEntries;
public UnorderedStreamElementQueue(
int capacity,
Executor executor,
OperatorActions operatorActions) {
Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");
this.capacity = capacity;
this.executor = Preconditions.checkNotNull(executor, "executor");
this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
this.uncompletedQueue = new ArrayDeque<>(capacity);
this.completedQueue = new ArrayDeque<>(capacity);
this.firstSet = new HashSet<>(capacity);
this.lastSet = firstSet;
this.numberEntries = 0;
this.lock = new ReentrantLock();
this.notFull = lock.newCondition();
this.hasCompletedEntries = lock.newCondition();
}
@Override
public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();
try {
while (numberEntries >= capacity) {
notFull.await();
}
addEntry(streamElementQueueEntry);
} finally {
lock.unlock();
}
}
@Override
public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();
try {
if (numberEntries < capacity) {
addEntry(streamElementQueueEntry);
LOG.debug("Put element into unordered stream element queue. New filling degree " +
"({}/{}).", numberEntries, capacity);
return true;
} else {
LOG.debug("Failed to put element into unordered stream element queue because it " +
"was full ({}/{}).", numberEntries, capacity);
return false;
}
} finally {
lock.unlock();
}
}
@Override
public AsyncResult peekBlockingly() throws InterruptedException {
lock.lockInterruptibly();
try {
while (completedQueue.isEmpty()) {
hasCompletedEntries.await();
}
LOG.debug("Peeked head element from unordered stream element queue with filling degree " +
"({}/{}).", numberEntries, capacity);
return completedQueue.peek();
} finally {
lock.unlock();
}
}
@Override
public AsyncResult poll() throws InterruptedException {
lock.lockInterruptibly();
try {
while (completedQueue.isEmpty()) {
hasCompletedEntries.await();
}
numberEntries--;
notFull.signalAll();
LOG.debug("Polled element from unordered stream element queue. New filling degree " +
"({}/{}).", numberEntries, capacity);
return completedQueue.poll();
} finally {
lock.unlock();
}
}
@Override
public Collection<StreamElementQueueEntry<?>> values() throws InterruptedException {
lock.lockInterruptibly();
try {
StreamElementQueueEntry<?>[] array = new StreamElementQueueEntry[numberEntries];
array = completedQueue.toArray(array);
int counter = completedQueue.size();
for (StreamElementQueueEntry<?> entry: firstSet) {
array[counter] = entry;
counter++;
}
for (Set<StreamElementQueueEntry<?>> asyncBufferEntries : uncompletedQueue) {
for (StreamElementQueueEntry<?> streamElementQueueEntry : asyncBufferEntries) {
array[counter] = streamElementQueueEntry;
counter++;
}
}
return Arrays.asList(array);
} finally {
lock.unlock();
}
}
@Override
public boolean isEmpty() {
return numberEntries == 0;
}
@Override
public int size() {
return numberEntries;
}
public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();
try {
if (firstSet.remove(streamElementQueueEntry)) {
completedQueue.offer(streamElementQueueEntry);
while (firstSet.isEmpty() && firstSet != lastSet) {
firstSet = uncompletedQueue.poll();
Iterator<StreamElementQueueEntry<?>> it = firstSet.iterator();
while (it.hasNext()) {
StreamElementQueueEntry<?> bufferEntry = it.next();
if (bufferEntry.isDone()) {
completedQueue.offer(bufferEntry);
it.remove();
}
}
}
LOG.debug("Signal unordered stream element queue has completed entries.");
hasCompletedEntries.signalAll();
}
} finally {
lock.unlock();
}
}
private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
assert(lock.isHeldByCurrentThread());
if (streamElementQueueEntry.isWatermark()) {
lastSet = new HashSet<>(capacity);
if (firstSet.isEmpty()) {
firstSet.add(streamElementQueueEntry);
} else {
Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1);
watermarkSet.add(streamElementQueueEntry);
uncompletedQueue.offer(watermarkSet);
}
uncompletedQueue.offer(lastSet);
} else {
lastSet.add(streamElementQueueEntry);
}
streamElementQueueEntry.onComplete(
(StreamElementQueueEntry<T> value) -> {
try {
onCompleteHandler(value);
} catch (InterruptedException e) {
// The accept executor thread got interrupted. This is probably cause by
// the shutdown of the executor.
LOG.debug("AsyncBufferEntry could not be properly completed because the " +
"executor thread has been interrupted.", e);
} catch (Throwable t) {
operatorActions.failOperator(new Exception("Could not complete the " +
"stream element queue entry: " + value + '.', t));
}
},
executor);
numberEntries++;
}
}
- UnorderedStreamElementQueue实现了StreamElementQueue接口,它emit结果的顺序是无序的,其内部使用了两个ArrayDeque,一个是uncompletedQueue,一个是completedQueue
- peekBlockingly方法首先判断completedQueue是否有元素,没有的话则执行hasCompletedEntries.await(),有则执行completedQueue.peek();put及tryPut都会调用addEntry方法,该方法会往uncompletedQueue队列新增元素,然后同时给每个streamElementQueueEntry的onComplete方法注册一个onCompleteHandler
- onCompleteHandler方法会将执行完成的streamElementQueueEntry从uncompletedQueue移除,然后添加到completedQueue
OrderedStreamElementQueue
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
@Internal
public class OrderedStreamElementQueue implements StreamElementQueue {
private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);
/** Capacity of this queue. */
private final int capacity;
/** Executor to run the onCompletion callback. */
private final Executor executor;
/** Operator actions to signal a failure to the operator. */
private final OperatorActions operatorActions;
/** Lock and conditions for the blocking queue. */
private final ReentrantLock lock;
private final Condition notFull;
private final Condition headIsCompleted;
/** Queue for the inserted StreamElementQueueEntries. */
private final ArrayDeque<StreamElementQueueEntry<?>> queue;
public OrderedStreamElementQueue(
int capacity,
Executor executor,
OperatorActions operatorActions) {
Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");
this.capacity = capacity;
this.executor = Preconditions.checkNotNull(executor, "executor");
this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
this.lock = new ReentrantLock(false);
this.headIsCompleted = lock.newCondition();
this.notFull = lock.newCondition();
this.queue = new ArrayDeque<>(capacity);
}
@Override
public AsyncResult peekBlockingly() throws InterruptedException {
lock.lockInterruptibly();
try {
while (queue.isEmpty() || !queue.peek().isDone()) {
headIsCompleted.await();
}
LOG.debug("Peeked head element from ordered stream element queue with filling degree " +
"({}/{}).", queue.size(), capacity);
return queue.peek();
} finally {
lock.unlock();
}
}
@Override
public AsyncResult poll() throws InterruptedException {
lock.lockInterruptibly();
try {
while (queue.isEmpty() || !queue.peek().isDone()) {
headIsCompleted.await();
}
notFull.signalAll();
LOG.debug("Polled head element from ordered stream element queue. New filling degree " +
"({}/{}).", queue.size() - 1, capacity);
return queue.poll();
} finally {
lock.unlock();
}
}
@Override
public Collection<StreamElementQueueEntry<?>> values() throws InterruptedException {
lock.lockInterruptibly();
try {
StreamElementQueueEntry<?>[] array = new StreamElementQueueEntry[queue.size()];
array = queue.toArray(array);
return Arrays.asList(array);
} finally {
lock.unlock();
}
}
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
@Override
public int size() {
return queue.size();
}
@Override
public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();
try {
while (queue.size() >= capacity) {
notFull.await();
}
addEntry(streamElementQueueEntry);
} finally {
lock.unlock();
}
}
@Override
public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();
try {
if (queue.size() < capacity) {
addEntry(streamElementQueueEntry);
LOG.debug("Put element into ordered stream element queue. New filling degree " +
"({}/{}).", queue.size(), capacity);
return true;
} else {
LOG.debug("Failed to put element into ordered stream element queue because it " +
"was full ({}/{}).", queue.size(), capacity);
return false;
}
} finally {
lock.unlock();
}
}
private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
assert(lock.isHeldByCurrentThread());
queue.addLast(streamElementQueueEntry);
streamElementQueueEntry.onComplete(
(StreamElementQueueEntry<T> value) -> {
try {
onCompleteHandler(value);
} catch (InterruptedException e) {
// we got interrupted. This indicates a shutdown of the executor
LOG.debug("AsyncBufferEntry could not be properly completed because the " +
"executor thread has been interrupted.", e);
} catch (Throwable t) {
operatorActions.failOperator(new Exception("Could not complete the " +
"stream element queue entry: " + value + '.', t));
}
},
executor);
}
private void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();
try {
if (!queue.isEmpty() && queue.peek().isDone()) {
LOG.debug("Signal ordered stream element queue has completed head element.");
headIsCompleted.signalAll();
}
} finally {
lock.unlock();
}
}
}
- OrderedStreamElementQueue实现了StreamElementQueue接口,它有序地emit结果,它内部有一个ArrayDeque类型的queue
- peekBlockingly方法首先判断queue是否有元素而且是执行完成的,没有就执行headIsCompleted.await(),有则执行queue.peek();put及tryPut都会调用addEntry方法,该方法会执行queue.addLast(streamElementQueueEntry),然后同时给每个streamElementQueueEntry的onComplete方法注册一个onCompleteHandler
- onCompleteHandler方法会检测执行完成的元素是否是队列的第一个元素,如果是则执行headIsCompleted.signalAll()
AsyncResult
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
@Internal
public interface AsyncResult {
boolean isWatermark();
boolean isResultCollection();
AsyncWatermarkResult asWatermark();
<T> AsyncCollectionResult<T> asResultCollection();
}
- AsyncResult接口定义了StreamElementQueue的元素异步返回的结果要实现的方法,该async result可能是watermark,可能是真正的结果
StreamElementQueueEntry
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
@Internal
public abstract class StreamElementQueueEntry<T> implements AsyncResult {
private final StreamElement streamElement;
public StreamElementQueueEntry(StreamElement streamElement) {
this.streamElement = Preconditions.checkNotNull(streamElement);
}
public StreamElement getStreamElement() {
return streamElement;
}
public boolean isDone() {
return getFuture().isDone();
}
public void onComplete(
final Consumer<StreamElementQueueEntry<T>> completeFunction,
Executor executor) {
final StreamElementQueueEntry<T> thisReference = this;
getFuture().whenCompleteAsync(
// call the complete function for normal completion as well as exceptional completion
// see FLINK-6435
(value, throwable) -> completeFunction.accept(thisReference),
executor);
}
protected abstract CompletableFuture<T> getFuture();
@Override
public final boolean isWatermark() {
return AsyncWatermarkResult.class.isAssignableFrom(getClass());
}
@Override
public final boolean isResultCollection() {
return AsyncCollectionResult.class.isAssignableFrom(getClass());
}
@Override
public final AsyncWatermarkResult asWatermark() {
return (AsyncWatermarkResult) this;
}
@Override
public final <T> AsyncCollectionResult<T> asResultCollection() {
return (AsyncCollectionResult<T>) this;
}
}
- StreamElementQueueEntry实现了AsyncResult接口,它定义了onComplete方法用于结果完成时的回调处理,同时它还定义了抽象方法getFuture供子类实现;它有两个子类,分别是WatermarkQueueEntry及StreamRecordQueueEntry
WatermarkQueueEntry
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
@Internal
public class WatermarkQueueEntry extends StreamElementQueueEntry<Watermark> implements AsyncWatermarkResult {
private final CompletableFuture<Watermark> future;
public WatermarkQueueEntry(Watermark watermark) {
super(watermark);
this.future = CompletableFuture.completedFuture(watermark);
}
@Override
public Watermark getWatermark() {
return (Watermark) getStreamElement();
}
@Override
protected CompletableFuture<Watermark> getFuture() {
return future;
}
}
- WatermarkQueueEntry继承了StreamElementQueueEntry,其元素类型为Watermark,同时实现了AsyncWatermarkResult接口
StreamRecordQueueEntry
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
@Internal
public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collection<OUT>>
implements AsyncCollectionResult<OUT>, ResultFuture<OUT> {
/** Timestamp information. */
private final boolean hasTimestamp;
private final long timestamp;
/** Future containing the collection result. */
private final CompletableFuture<Collection<OUT>> resultFuture;
public StreamRecordQueueEntry(StreamRecord<?> streamRecord) {
super(streamRecord);
hasTimestamp = streamRecord.hasTimestamp();
timestamp = streamRecord.getTimestamp();
resultFuture = new CompletableFuture<>();
}
@Override
public boolean hasTimestamp() {
return hasTimestamp;
}
@Override
public long getTimestamp() {
return timestamp;
}
@Override
public Collection<OUT> get() throws Exception {
return resultFuture.get();
}
@Override
protected CompletableFuture<Collection<OUT>> getFuture() {
return resultFuture;
}
@Override
public void complete(Collection<OUT> result) {
resultFuture.complete(result);
}
@Override
public void completeExceptionally(Throwable error) {
resultFuture.completeExceptionally(error);
}
}
- StreamRecordQueueEntry继承了StreamElementQueueEntry,同时实现了AsyncCollectionResult、ResultFuture接口
小结
- AsyncWaitOperator继承了AbstractUdfStreamOperator,覆盖了AbstractUdfStreamOperator的setup、open、initializeState、close、dispose方法;实现了OneInputStreamOperator接口定义的processElement、processWatermark、processLatencyMarker方法;实现了OperatorActions定义的failOperator方法;open方法使用Emitter创建并启动AsyncIO-Emitter-Thread
- Emitter实现了Runnable接口,它主要负责从StreamElementQueue取出element,然后输出到TimestampedCollector;其run方法就是不断循环调用streamElementQueue.peekBlockingly()阻塞获取AsyncResult,获取到之后就调用output方法将result输出出去
- StreamElementQueue接口主要定义了AsyncWaitOperator所要用的blocking stream element queue的接口;它定义了put、tryPut、peekBlockingly、poll、values、isEmpty、size方法;StreamElementQueue接口有两个子类分别是UnorderedStreamElementQueue及OrderedStreamElementQueue;队列元素类型为StreamElementQueueEntry,StreamElementQueueEntry实现了AsyncResult接口,它定义了onComplete方法用于结果完成时的回调处理,同时它还定义了抽象方法getFuture供子类实现;它有两个子类,分别是WatermarkQueueEntry及StreamRecordQueueEntry
网友评论