Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
简介
Flink的特点是高吞吐低延迟。但是Flink中的某环节的数据处理逻辑需要和外部系统交互,调用耗时不可控会显著降低集群性能,这时候怎么办?
为了解决这个问题,Flink引入了AsyncFunction系列接口。使用这些异步接口调用外部服务的时候,不用再同步等待结果返回,只需要将数据存入队列,外部服务接口返回时会更新队列数据状态。在调用外部服务后直接返回处理下一个异步调用,不需要同步等待结果。下游拉取数据的时候直接从队列获取即可。
使用方法
在讲解AsyncFunction
使用方法之前,我们先“伪造”一个耗时的外部系统调用。调用pullData
会立即返回一个CompletableFuture
。耗时5秒后生成的数据通过CompletableFuture
返回。
public class AsyncIODemo implements Serializable {
private final ExecutorService executorService = Executors.newFixedThreadPool(4);
public CompletableFuture<String> pullData(final String source) {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
executorService.submit(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
completableFuture.complete("Output value: " + source);
});
return completableFuture;
}
}
接下来编写Flink作业:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.fromElements("Alpha", "Beta", "Gamma", "Delta")
val asyncStream = AsyncDataStream.orderedWait(stream, new AsyncFunction[String, String] {
override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {
// 调用前面的外部系统调用,拉取数据
val future = new AsyncIODemo().pullData(input)
// 这个方法是非阻塞的,一旦数据获取成功,会立即调用resultFuture.complete方法
future.whenCompleteAsync(new BiConsumer[String, Throwable] {
override def accept(t: String, u: Throwable): Unit = {
resultFuture.complete(Array(t))
}
})
}
}, 10, TimeUnit.SECONDS)
// 上面设置最长异步调用超时时间为10秒
asyncStream.print()
env.execute()
执行Flink作业。我们发现虽然外部系统调用了4次,然而并没有等待20秒后才输出全部4个结果,实际上只等待了5秒左右。AsyncFunction
的功能得到了验证。
注意:尽管
AsyncFunction
字面上为异步调用,实际上asynInvoke
方法仍然是同步的。绝不能在该方法中阻塞等待调用结果,这样失去了它原本的作用。应该在此处编写异步回调方法,通过异步方式通知Flink数据已获取完毕。
AsyncFunction
从这里开始进入源码分析环节。AsyncFunction
接口源码如下:
@PublicEvolving
public interface AsyncFunction<IN, OUT> extends Function, Serializable {
/**
* Trigger async operation for each stream input.
*
* @param input element coming from an upstream task
* @param resultFuture to be completed with the result data
* @exception Exception in case of a user code error. An exception will make the task fail and
* trigger fail-over process.
*/
void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
/**
* {@link AsyncFunction#asyncInvoke} timeout occurred. By default, the result future is
* exceptionally completed with a timeout exception.
*
* @param input element coming from an upstream task
* @param resultFuture to be completed with the result data
*/
default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
resultFuture.completeExceptionally(
new TimeoutException("Async function call has timed out."));
}
}
AsyncFunction
接口有两个方法:
- asyncInvoke:异步操作每一个数据流输入元素。方法的第一个参数
input
为数据流中的元素,第二个参数resultFuture
用于收集异步处理的结果或者是错误信息。不要在此方法内同步等待数据处理逻辑,这样会阻塞线程,降低作业吞吐量。 - timeout:定义数据超时处理逻辑。方法的参数和
asyncInvoke
相同。AsyncFunction
已经提供了默认实现。如果需要自定义超时逻辑,可以覆盖这个方法。
ResultFuture
ResultFuture
在异步操作的时候用于收集结果或错误。
@PublicEvolving
public interface ResultFuture<OUT> {
/**
* Completes the result future with a collection of result objects.
*
* <p>Note that it should be called for exactly one time in the user code. Calling this function
* for multiple times will cause data lose.
*
* <p>Put all results in a {@link Collection} and then emit output.
*
* @param result A list of results.
*/
void complete(Collection<OUT> result);
/**
* Completes the result future exceptionally with an exception.
*
* @param error A Throwable object.
*/
void completeExceptionally(Throwable error);
}
它包含两个方法:
- complete:如果异步逻辑顺利返回,调用
complete
方法转入结果数据的集合对象,将数据传递给下游。 - completeExceptionally:如果异步逻辑需要错误,需要调用这个方法将错误传入。
AsyncDataStream
该类是创建异步算子的工具类。它有2种方法:
- unorderedWait:不保证输出元素的顺序和读入元素顺序相同。
- orderedWait:保证输出元素的顺序和读入元素顺序相同。
这两种方法每个还对应两个重载方法,但是参数含义是相同的。参数为:
- DataStream<IN> in:需要添加异步处理逻辑的数据流。
AsyncDataStream
实际上是个工具类,并不是一种流的类型。 - AsyncFunction<IN, OUT> func:用户定义的异步执行逻辑。
- long timeout:异步任务超时时间。
- TimeUnit timeUnit:超时时间单位。
- int capacity:异步任务初始队列长度。只有部分重载方法有这个参数。默认值为100。
下面是orderedWait
其中一个重载方法的代码。
public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
long timeout,
TimeUnit timeUnit,
int capacity) {
return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);
}
它调用了addOperator
方法,为DataStream添加一个OneInputTransformation
,其中包含了AsyncWaitOperator
。
其他几个unorderedWait
或orderedWait
重载方法调用的都是addOperator
,不再赘述。
接下来轮到了addOperator
方法:
private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
long timeout,
int bufSize,
OutputMode mode) {
TypeInformation<OUT> outTypeInfo =
TypeExtractor.getUnaryOperatorReturnType(
func,
AsyncFunction.class,
0,
1,
new int[] {1, 0},
in.getType(),
Utils.getCallLocationName(),
true);
// create transform
AsyncWaitOperatorFactory<IN, OUT> operatorFactory =
new AsyncWaitOperatorFactory<>(
in.getExecutionEnvironment().clean(func), timeout, bufSize, mode);
return in.transform("async wait operator", outTypeInfo, operatorFactory);
}
这个方法创建了一个AsyncWaitOperatorFactory
,将其包装入transformation。factory在生成ExecutionGraph
的时候将创建出AsyncWaitOperator
。下一节我们一起分析下异步操作的核心AsyncWaitOperator
。
AsyncWaitOperator
我们从AsyncWaitOperator
的构造方法开始。构造方法参数中最重要的是outputMode
,它决定了异步处理任务队列的类型,从而决定用户数据异步处理后是否严格按照输入顺序输出。
public AsyncWaitOperator(
@Nonnull AsyncFunction<IN, OUT> asyncFunction,
long timeout,
int capacity,
@Nonnull AsyncDataStream.OutputMode outputMode,
@Nonnull ProcessingTimeService processingTimeService,
@Nonnull MailboxExecutor mailboxExecutor) {
super(asyncFunction);
// 设置可以和下游算子组成OperatorChain
setChainingStrategy(ChainingStrategy.ALWAYS);
Preconditions.checkArgument(
capacity > 0, "The number of concurrent async operation should be greater than 0.");
// 默认队列长度
this.capacity = capacity;
// 枚举值,决定用户数据异步处理后是否严格按照输入顺序输出
this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");
// 异步处理超时时间
this.timeout = timeout;
// 时间服务,用于设置定时器,检测超时等
this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
// 用户作业执行线程池
this.mailboxExecutor = mailboxExecutor;
}
在operator创建出来后紧接着会执行setup
方法,进行初始化操作。
@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<OUT>> output) {
// 调用父类初始化逻辑
super.setup(containingTask, config, output);
// 创建元素序列化器
this.inStreamElementSerializer =
new StreamElementSerializer<>(
getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));
switch (outputMode) {
case ORDERED:
// 如果需要保持输出数据有序
// 创建的队列为OrderedStreamElementQueue
queue = new OrderedStreamElementQueue<>(capacity);
break;
case UNORDERED:
// 如果不需要保持输出有序
// 创建的队列为UnorderedStreamElementQueue
queue = new UnorderedStreamElementQueue<>(capacity);
break;
default:
throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
}
this.timestampedCollector = new TimestampedCollector<>(output);
}
setup
方法根据outputMode
是否保证输出元素顺序,来决定创建的StreamElementQueue
。
接下来是处理元素的processElement
方法。上游每个元素到来的时候,都会调用这个方法。
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// add element first to the queue
// 将元素放入队列中
// 返回队列的entry
// 队列中的entry类型实现了ResultFuture接口,后面介绍
final ResultFuture<OUT> entry = addToWorkQueue(element);
// 创建ResultHandler,包装了超时定时器,输入数据和resultFuture
// 用来操作resultFuture和超时定时器
final ResultHandler resultHandler = new ResultHandler(element, entry);
// register a timeout for the entry if timeout is configured
// 如果配置了超时时间
if (timeout > 0L) {
// 计算超时时刻
final long timeoutTimestamp =
timeout + getProcessingTimeService().getCurrentProcessingTime();
// 注册一个定时器,在超时的时刻调用AsyncFunction的timeout方法
final ScheduledFuture<?> timeoutTimer =
getProcessingTimeService()
.registerTimer(
timeoutTimestamp,
timestamp ->
userFunction.timeout(
element.getValue(), resultHandler));
// 设置定时器给resultHandler
resultHandler.setTimeoutTimer(timeoutTimer);
}
// 调用AsyncFunction的asyncInvoke方法
userFunction.asyncInvoke(element.getValue(), resultHandler);
}
继续查看addToWorkQueue
方法,将元素放入任务队列中。
private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement)
throws InterruptedException {
Optional<ResultFuture<OUT>> queueEntry;
// 如果元素添加队列失败,说明队列已满
// 需要当前线程让出执行机会给mailboxExecutor,即执行用户自定义处理逻辑
while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) {
mailboxExecutor.yield();
}
// 添加队列成功,返回ResultFuture
return queueEntry.get();
}
workQueue我们在后面讨论。接下来分析ResultHandler
。
ResultHandler
ResultHandler
是ResultFuture
的实现类,为AsyncFunction
中两个方法的参数,让用户使用。分别处理异步处理完成(complete
)和异步处理异常(completeExceptionally
)两种情况。
ResultHandler
持有4个成员变量:
- timeoutTimer:定时器,在数据计算完毕(调用了
complete
方法的时候),需要将timer清除,所以需要持有定时器。 - inputRecord:数据流中的原始数据。
- resultFuture:实际为元素队列中的entry。这个后面介绍。
- completed:用来表示异步计算是否完成。
用户的自定义异步处理逻辑在AsyncFunction
中,异步处理完成的时候需要调用ResultHandler
的complete
方法。这个方法将completed
变量标记为true
。然后调用processInMainbox
方法。
@Override
public void complete(Collection<OUT> results) {
Preconditions.checkNotNull(
results, "Results must not be null, use empty collection to emit nothing");
// already completed (exceptionally or with previous complete call from ill-written
// AsyncFunction), so
// ignore additional result
if (!completed.compareAndSet(false, true)) {
return;
}
processInMailbox(results);
}
processInMainbox
方法在MailboxExecutor
线程池执行resultFuture
的complete
方法,通知持有这些元素的队列,该元素已经处理完毕。然后清除掉超时时间timer。最后调用outputCompletedElement
,输出已完成的元素到下游。对应的代码如下所示:
private void processInMailbox(Collection<OUT> results) {
// move further processing into the mailbox thread
mailboxExecutor.execute(
() -> processResults(results),
"Result in AsyncWaitOperator of input %s",
results);
}
private void processResults(Collection<OUT> results) {
// Cancel the timer once we've completed the stream record buffer entry. This will
// remove the registered
// timer task
if (timeoutTimer != null) {
// canceling in mailbox thread avoids
// https://issues.apache.org/jira/browse/FLINK-13635
timeoutTimer.cancel(true);
}
// update the queue entry with the result
resultFuture.complete(results);
// now output all elements from the queue that have been completed (in the correct
// order)
outputCompletedElement();
}
private void outputCompletedElement() {
if (queue.hasCompletedElements()) {
// emit only one element to not block the mailbox thread unnecessarily
queue.emitCompletedElement(timestampedCollector);
// if there are more completed elements, emit them with subsequent mails
if (queue.hasCompletedElements()) {
mailboxExecutor.execute(
this::outputCompletedElement, "AsyncWaitOperator#outputCompletedElement");
}
}
}
StreamElementQueue
这一节我们分析异步处理的核心:StreamElementQueue
。所有需要异步处理的数据都会在此队列中排队。
此队列需要支持是否保持输出元素顺序这两种情形,因此它具有两个实现类:
- OrderedStreamElementQueue:元素输出的顺序严格和输入的顺序一致。
- UnorderedStreamElementQueue:不保证元素输出的顺序和输入的一致。
该接口有如下方法:
@Internal
public interface StreamElementQueue<OUT> {
// 尝试将元素放入队列,如果队列已满,返回Optional.EMPTY
// 返回一个ResultFuture对象
Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement);
// 弹出队列头部一个已经完成异步处理的元素给outputCollector
void emitCompletedElement(TimestampedCollector<OUT> output);
// 检查队列头部元素是否已完成异步处理
boolean hasCompletedElements();
// 其余方法省略
// ...
}
下面分别介绍这两种子类Queue。
OrderedStreamElementQueue
这个队列保证了输出元素顺序和输入元素顺序严格一致。它使用一个Queue<StreamElementQueueEntry<OUT>>
类型队列保存输入数据。Queue使用的是ArrayDeque
类型。
添加元素的tryPut
方法如下。如果添加成功(未超出队列容量限制),返回ResultFuture<OUT>
,否则返回Optional.EMPTY
。
@Override
public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
if (queue.size() < capacity) {
// 只有队列有剩余空间的情况下才加入队列
// 根据element的类型(数据还是watermark),构造对应的队列entry
StreamElementQueueEntry<OUT> queueEntry = createEntry(streamElement);
// 将entry加入队列
queue.add(queueEntry);
LOG.debug(
"Put element into ordered stream element queue. New filling degree "
+ "({}/{}).",
queue.size(),
capacity);
return Optional.of(queueEntry);
} else {
LOG.debug(
"Failed to put element into ordered stream element queue because it "
+ "was full ({}/{}).",
queue.size(),
capacity);
// 如果超出队列容量,返回EMPTY
return Optional.empty();
}
}
createEntry
方法根据element的类型,创建不同的队列entry(StreamElementQueueEntry
)。如果元素是数据类型,创建StreamRecordQueueEntry
,如果元素是watermark,则创建WatermarkQueueEntry
。
private StreamElementQueueEntry<OUT> createEntry(StreamElement streamElement) {
if (streamElement.isRecord()) {
return new StreamRecordQueueEntry<>((StreamRecord<?>) streamElement);
}
if (streamElement.isWatermark()) {
return new WatermarkQueueEntry<>((Watermark) streamElement);
}
throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
}
从队列中取出元素的方法为emitCompletedElement
。OrderedStreamElementQueue
从队列的头部获取一个元素,发送给outputCollector
。hasCompletedElements
方法也是检测队列头部的元素是否已经完成异步处理。所以说OrderedStreamElementQueue
能够保证输出数据和输入数据的顺序严格一致。但是带来的问题是处理延迟会受到异步处理时间的影响。
@Override
public boolean hasCompletedElements() {
return !queue.isEmpty() && queue.peek().isDone();
}
@Override
public void emitCompletedElement(TimestampedCollector<OUT> output) {
if (hasCompletedElements()) {
final StreamElementQueueEntry<OUT> head = queue.poll();
head.emitResult(output);
}
}
UnorderedStreamElementQueue
和OrderedStreamElementQueue
不同的是,UnorderedStreamElementQueue
使用Deque<Segment<OUT>>
类型双向队列来保存输入数据。队列的元素类型为Segment
。需要注意的是,队列中元素的个数并不等于元素的个数,因为一个Segment
可以包含多个元素。
Segment
内部包含了两个集合incompleteElements
和completedElements
,分别保存未完成处理的元素和已完成处理的元素。
/** Unfinished input elements. */
private final Set<StreamElementQueueEntry<OUT>> incompleteElements;
/** Undrained finished elements. */
private final Queue<StreamElementQueueEntry<OUT>> completedElements;
Segment(int initialCapacity) {
incompleteElements = new HashSet<>(initialCapacity);
completedElements = new ArrayDeque<>(initialCapacity);
}
添加元素的时候,需要判断队列entry是否已经异步处理完毕,将其加入相应的集合中。
void add(StreamElementQueueEntry<OUT> queueEntry) {
if (queueEntry.isDone()) {
completedElements.add(queueEntry);
} else {
incompleteElements.add(queueEntry);
}
}
当entry中数据计算完毕的时候,需要调用complete
方法,将这个entry移动到已完成计算的元素集合中。
void completed(StreamElementQueueEntry<OUT> elementQueueEntry) {
// adding only to completed queue if not completed before
// there may be a real result coming after a timeout result, which is updated in the
// queue entry but
// the entry is not re-added to the complete queue
if (incompleteElements.remove(elementQueueEntry)) {
completedElements.add(elementQueueEntry);
}
}
在触发计算的时候,需要获取到已经完成计算的元素。获取方法为从completedElements
中poll
一个交给outputCollector
。
int emitCompleted(TimestampedCollector<OUT> output) {
final StreamElementQueueEntry<OUT> completedEntry = completedElements.poll();
if (completedEntry == null) {
return 0;
}
completedEntry.emitResult(output);
return 1;
}
分析到这里不难发现,Segment
放弃了元素顺序保证,将已经完成计算的元素挑出来放置到completedElements
集合中,因此下游在拉取数据的时候,不会因为队列中间有一个长时间未complete的元素而阻塞,从而降低了延迟,并且减少了延迟抖动。
那么问题来了,看似一个Segment
皆可以解决问题,为何需要一个队列来存放Segment
?Segment
是什么时候创建的?如何决定元素加入哪个Segment
?接下来我们讨论这些问题。
首先分析tryPut
方法。
@Override
public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
// 检查是否超出队列长度
if (size() < capacity) {
StreamElementQueueEntry<OUT> queueEntry;
// 根据不同的数据类型来生成不同的队列entry
if (streamElement.isRecord()) {
queueEntry = addRecord((StreamRecord<?>) streamElement);
} else if (streamElement.isWatermark()) {
queueEntry = addWatermark((Watermark) streamElement);
} else {
throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
}
numberOfEntries++;
LOG.debug(
"Put element into unordered stream element queue. New filling degree "
+ "({}/{}).",
size(),
capacity);
return Optional.of(queueEntry);
} else {
LOG.debug(
"Failed to put element into unordered stream element queue because it "
+ "was full ({}/{}).",
size(),
capacity);
return Optional.empty();
}
}
对比分析下addRecord
和addWatermark
方法,不难发现端倪。加入record的时候,如果队列中没有Segment
则创建一个新的Segment
,如果有,就在这个Segment
中插入这个record。然而加入watermark这个方法则不同。它还会判断队列中最后一个Segment
是否为空。如果为空,则创建一个新的Segment
再把watermark放入。到这里我们就搞清楚了Segment
是怎么创建和数据如何加入Segment
这两个问题。数据流中每当遇到一个watermark,就会使用新的Segment
。
private StreamElementQueueEntry<OUT> addRecord(StreamRecord<?> record) {
// ensure that there is at least one segment
Segment<OUT> lastSegment;
if (segments.isEmpty()) {
lastSegment = addSegment(capacity);
} else {
lastSegment = segments.getLast();
}
// entry is bound to segment to notify it easily upon completion
StreamElementQueueEntry<OUT> queueEntry =
new SegmentedStreamRecordQueueEntry<>(record, lastSegment);
lastSegment.add(queueEntry);
return queueEntry;
}
private StreamElementQueueEntry<OUT> addWatermark(Watermark watermark) {
Segment<OUT> watermarkSegment;
if (!segments.isEmpty() && segments.getLast().isEmpty()) {
// reuse already existing segment if possible (completely drained) or the new segment
// added at the end of
// this method for two succeeding watermarks
watermarkSegment = segments.getLast();
} else {
watermarkSegment = addSegment(1);
}
StreamElementQueueEntry<OUT> watermarkEntry = new WatermarkQueueEntry<>(watermark);
watermarkSegment.add(watermarkEntry);
// add a new segment for actual elements
addSegment(capacity);
return watermarkEntry;
}
接下来我们看下发送已完成数据这个方法。和加入数据相反,这里获取队列中第一个Segment
,从其中拿出一个已完成计算的元素。最后判断下这个Segment
中是否保存的还有元素,如果没有的话,将这个Segment
从队列中弹出被垃圾回收。但是至少要确保队列中有一个Segment
。
@Override
public void emitCompletedElement(TimestampedCollector<OUT> output) {
if (segments.isEmpty()) {
return;
}
final Segment currentSegment = segments.getFirst();
numberOfEntries -= currentSegment.emitCompleted(output);
// remove any segment if there are further segments, if not leave it as an optimization even
// if empty
if (segments.size() > 1 && currentSegment.isEmpty()) {
segments.pop();
}
}
通过这种设计UnorderedStreamElementQueue
能够将一连串数据,通过watermark分隔,放入不同的Segment
中。从emitCompletedElement
方法可以看出,只有队列头部的Segment
中的数据全部弹出或超时之后,才有可能去读取下一个Segment
中的数据。这种设计允许一定程度的输出结果乱序,但是乱序程度不可能跨越watermark。从而保证了watermark语义的正确,不会由于乱序的容忍而导致部分数据被意外认为“来迟”。
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。
网友评论