在使用 Flink 处理实时数据流时,经常需要和外部系统进行交互。例如,在构建实时数据仓库的时候,通常需要将消息和外部维表进行关联,以获得额外的维度数据。由于外部系统的响应时间和网络延迟可能会很高,如果采用同步调用的方式,那么外部调用的高延迟势必会影响到系统的吞吐量,进而成为系统的瓶颈。这种情况下,我们需要采用异步调用的方式。异步调用相比于同步调用,不同请求的等待时间可以重叠,从而提升了吞吐率。
Async I/O 的使用方式
在 Flink 中使用 Async I/O 的需要有一个支持异步请求的客户端。以官方文档给出的说明为例:
/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
/** The database specific client that can issue concurrent requests with callbacks */
lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)
/** The context used for the future callbacks */
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {
// issue the asynchronous request, receive a future for the result
// 发起异步请求,返回结果是一个 Future
val resultFutureRequested: Future[String] = client.query(str)
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
// 请求完成时的回调,将结果交给 ResultFuture
resultFutureRequested.onSuccess {
case result: String => resultFuture.complete(Iterable((str, result)))
}
}
}
// create the original stream
val stream: DataStream[String] = ...
// 应用 async I/O 转换,设置等待模式、超时时间、以及进行中的异步请求的最大数量
val resultStream: DataStream[(String, String)] =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)
AsyncDataStream 提供了两种调用方法,分别是 orderedWait 和 unorderedWait,这分别对应了有序和无序两种输出模式。之所以会提供两种输出模式,是因为异步请求的完成时间是不确定的,先发出的请求的完成时间可能会晚于后发出的请求。在“有序”的输出模式下,所有计算结果的提交完全和消息的到达顺序一致;而在“无序”的输出模式下,计算结果的提交则是和请求的完成顺序相关的,先处理完成的请求的计算结果会先提交。值得注意的是,在使用“事件时间”的情况下,“无序”输出模式仍然可以保证 watermark 的正常处理,即在两个 watermark 之间的消息的异步请求结果可能是异步提交的,但在 watermark 之后的消息不能先于该 watermark 之前的消息提交。
由于异步请求的完成时间不确定,需要设置请求的超时时间,并配置同时进行中的异步请求的最大数量。
Async I/O 的实现
AsyncDataStream
在运行时被转换为 AsyncWaitOperator
算子,它是 AbstractUdfStreamOperator
的子类。下面我们来看看 AsyncWaitOperator
的实现原理。
基本原理
AsyncWaitOperator
算子相比于其它算子的最大不同在于,它的输入和输出并不是同步的。因此,在 AsyncWaitOperator
内部采用了一种 “生产者-消费者” 模型,基于一个队列解耦异步计算和计算结果的提交。StreamElementQueue
提供了一种队列的抽象,一个“消费者”线程 Emitter
从中取出已完成的计算结果,并提交给下游算子,而异步请求则充当了队列“生产者”的角色。基本的处理逻辑如下图所示
- AsyncWaitOperator
//用于提交队列中完成的队列元素
private transient Emitter<OUT> emitter;
public void processElement(StreamRecord<IN> element) throws Exception {
final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);
if (timeout > 0L) {
long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
//注册一个定时器,在超时时调用 timeout 方法
final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
timeoutTimestamp,
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
userFunction.timeout(element.getValue(), streamRecordBufferEntry);
}
});
// Cancel the timer once we've completed the stream record buffer entry. This will remove
// the register trigger task
streamRecordBufferEntry.onComplete(
(StreamElementQueueEntry<Collection<OUT>> value) -> {
timerFuture.cancel(true);
},
executor);
}
//加入队列
addAsyncBufferEntry(streamRecordBufferEntry);
//发送异步请求,执行用户方法进行异步调用
userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
}
private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
pendingStreamElementQueueEntry = streamElementQueueEntry;
//尝试将待完成的请求加入队列,如果队列已满(到达异步请求的上限),会阻塞
while (!queue.tryPut(streamElementQueueEntry)) {
checkpointingLock.wait();
}
pendingStreamElementQueueEntry = null;
}
public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collection<OUT>>
implements AsyncCollectionResult<OUT>, ResultFuture<OUT> {
/** Timestamp information. */
private final boolean hasTimestamp;
private final long timestamp;
//包含执行结果的Future
private final CompletableFuture<Collection<OUT>> resultFuture;
public StreamRecordQueueEntry(StreamRecord<?> streamRecord) {
super(streamRecord);
hasTimestamp = streamRecord.hasTimestamp();
timestamp = streamRecord.getTimestamp();
//StreamRecordQueueEntry 内部包装了CompletableFuture(新建了一个异步调用的Future)
resultFuture = new CompletableFuture<>();
}
}
Emitter线程线程消费队列数据,发送到指定的输出.(典型的生产者消费者模型)
@Override
public void run() {
try {
while (running) {
//以阻塞的方式从队列peek已完成的数据
AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();
output(streamElementEntry);
}
}
}
private void output(AsyncResult asyncResult) throws InterruptedException {
//判断完成的结果是水印
if (asyncResult.isWatermark()) {
synchronized (checkpointLock) {
AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();
LOG.debug("Output async watermark.");
//发送水印
output.emitWatermark(asyncWatermarkResult.getWatermark());
//从队列移除元素
streamElementQueue.poll();
// 通知主线程有队列有空闲空间,可以放元素了
checkpointLock.notifyAll();
}
} else {
//异步完成的结果为数据类型
AsyncCollectionResult<OUT> streamRecordResult = asyncResult.asResultCollection();
if (streamRecordResult.hasTimestamp()) {
timestampedCollector.setAbsoluteTimestamp(streamRecordResult.getTimestamp());
} else {
timestampedCollector.eraseTimestamp();
}
synchronized (checkpointLock) {
LOG.debug("Output async stream element collection result.");
try {
Collection<OUT> resultCollection = streamRecordResult.get();
if (resultCollection != null) {
for (OUT result : resultCollection) {
//发送数据
timestampedCollector.collect(result);
}
}
}
streamElementQueue.poll();
checkpointLock.notifyAll();
}
}
}
AsyncWaitOperator
可以工作在两种模式下,即 ORDERED
和 UNORDERED
。Flink 通过 StreamElementQueue
的不同实现实现了这两种模式。
在“有序”
模式下,所有异步请求的结果
必须按照消息的到达顺序
提交到下游算子。在这种模式下,StreamElementQueue 的具体是实现是 OrderedStreamElementQueue。OrderedStreamElementQueue 的底层是一个有界的队列
,异步请求的计算结果按顺序加入到队列中,只有队列头部的异步请求
完成后才可以从队列中获取计算结果
public class OrderedStreamElementQueue implements StreamElementQueue {
//队列容量
private final int capacity;
/** Executor to run the onCompletion callback. */
private final Executor executor;
/** Lock and conditions for the blocking queue. */
private final ReentrantLock lock; 初始话为非公平,提高效率
//队列有空闲空间的condition
private final Condition notFull;
//队列同步完成的 condition
private final Condition headIsCompleted;
/** Queue for the inserted StreamElementQueueEntries. */
private final ArrayDeque<StreamElementQueueEntry<?>> queue;
@Override
public AsyncResult peekBlockingly() throws InterruptedException {
lock.lockInterruptibly();
try {
//只有队列头部的请求完成或队列不为空后才解除阻塞状态
while (queue.isEmpty() || !queue.peek().isDone()) {
headIsCompleted.await();
}
return queue.peek();
} finally {
lock.unlock();
}
}
@Override
public AsyncResult poll() throws InterruptedException {
lock.lockInterruptibly();
try {
while (queue.isEmpty() || !queue.peek().isDone()) {
headIsCompleted.await();
}
//通知其他线程可以向队列放元素,队列目前not full
notFull.signalAll();
return queue.poll();
} finally {
lock.unlock();
}
}
@Override
public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();
try {
while (queue.size() >= capacity) {
//队列容量达到上限,阻塞.等待被唤醒
notFull.await();
}
addEntry(streamElementQueueEntry);
} finally {
lock.unlock();
}
}
@Override
public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();
try {
if (queue.size() < capacity) { //未达容量上限
addEntry(streamElementQueueEntry);
return true;
} else {
return false;
}
} finally {
lock.unlock();
}
}
}
private void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();
try {
if (!queue.isEmpty() && queue.peek().isDone()) {
//callback 触发,队列头部请求已经完成.唤醒其他线程
headIsCompleted.signalAll();
}
} finally {
lock.unlock();
}
}
“无序”模式
在“无序”
模式下,异步计算结果的提交不是
由消息到达的顺序
确定的,而是取决于异步请求
的完成顺序
。当然,在使用“事件时间”
的情况下,要保证 watermark
语义的正确性。在使用“处理时间”的情况下,由于不存在 Watermark,因此可以看作一种特殊的情况。在 UnorderedStreamElementQueue
中巧妙地实现了这两种
情况
在 UnorderedStreamElementQueue
内部使用了两个队列,ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue
中保存未完成
的异步请求计算结果,而 completedQueue
中保存已完成
的异步请求计算结果。注意,ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue 这个队列中的元素是异步请求计算结果的散列集合,从图中也可以看出, watermarkSet
作为一种特殊的集合,其内部只有一个元素,即 Watermark
,充当了不同散列集合之间的分界
。这样就保证了在一个 Watermark 之后的异步请求的计算结果不会先于
该 Watermark 之前
进行提交
。firstSet
中完成异步请求的计算结果会被转移到 completedQueue
队列中,firstSet
内部的所有异步请求的计算结果都是可以乱序提交
的。
-
如果不使用“事件时间”,那么没有 Watermark 产生,所有的异步请求都会进入 firstSet 中,因而所有的结果都是乱序提交的
-
firstSet:存储Watermark 之前的数据,完成的请求会通过onCompleteHandler方法从firstSet移除放入completedQueue,
如果firstSet被消费完成则从uncompletedQueue拉取下一批待完成请求的请求Set集合 -
lastSet:目前看只用到了没开启水印的情况,
onCompleteHandler
中firstSet != lastSet
的判断条件
private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
// 开启水印
if (streamElementQueueEntry.isWatermark()) {
//为lastSet引用指向新set对象
lastSet = new HashSet<>(capacity);
//firstSet为空,将水印元素放入firstSet
if (firstSet.isEmpty()) {
firstSet.add(streamElementQueueEntry);
} else {
//firstSet不为空, 构造一个只包含这个 watermark 的 set 加入到uncompletedQueue 队列中
Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1);
watermarkSet.add(streamElementQueueEntry);
uncompletedQueue.offer(watermarkSet);
}
uncompletedQueue.offer(lastSet);
} else {
// 没开启水印,元素直接放入firstSet(lastset==firstSet)
lastSet.add(streamElementQueueEntry);
}
//设置异步请求完成后的回调
streamElementQueueEntry.onComplete(
(StreamElementQueueEntry<T> value) -> {
try {
onCompleteHandler(value);
} catch (InterruptedException e) {
... 略
},
executor);
numberEntries++;
}
public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();
try {
if (firstSet.remove(streamElementQueueEntry)) {
completedQueue.offer(streamElementQueueEntry);
//对于未开启水印,不会走到while循环,因为firstSet == lastSet
while (firstSet.isEmpty() && firstSet != lastSet) {
1.firstSet为空,并且当前为水印元素所以代表当前水印之前的元素都处理完成
2.从uncompletedQueue拉取新的待完成的请求set赋值为firstSet
firstSet = uncompletedQueue.poll();
Iterator<StreamElementQueueEntry<?>> it = firstSet.iterator();
3.循环遍历firstSet
while (it.hasNext()) {
StreamElementQueueEntry<?> bufferEntry = it.next();
//看是否有已完成的请求,并放入completedQueue
同时从firstSet中清除
if (bufferEntry.isDone()) {
completedQueue.offer(bufferEntry);
it.remove();
}
}
}
LOG.debug("Signal unordered stream element queue has completed entries.");
hasCompletedEntries.signalAll();
}
} finally {
lock.unlock();
}
}
最后放上一个异步io的例子
private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
private static final long serialVersionUID = 2098635244857937717L;
private transient ExecutorService executorService;
/**
* The result of multiplying sleepFactor with a random float is used to pause
* the working thread in the thread pool, simulating a time consuming async operation.
*/
private final long sleepFactor;
/**
* The ratio to generate an exception to simulate an async error. For example, the error
* may be a TimeoutException while visiting HBase.
*/
private final float failRatio;
private final long shutdownWaitTS;
SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) {
this.sleepFactor = sleepFactor;
this.failRatio = failRatio;
this.shutdownWaitTS = shutdownWaitTS;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executorService = Executors.newFixedThreadPool(30);
}
@Override
public void close() throws Exception {
super.close();
ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService);
}
@Override
public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) {
executorService.submit(() -> {
// wait for while to simulate async operation here
long sleep = (long) (ThreadLocalRandom.current().nextFloat() * sleepFactor);
try {
Thread.sleep(sleep);
if (ThreadLocalRandom.current().nextFloat() < failRatio) {
resultFuture.completeExceptionally(new Exception("wahahahaha..."));
} else {
resultFuture.complete(
Collections.singletonList("key-" + (input % 10)));
}
} catch (InterruptedException e) {
resultFuture.complete(new ArrayList<>(0));
}
});
}
}
容错
在异步调用模式下,可能会同时有很多个请求正在处理中。因而在进行快照的时候,需要将异步调用尚未完成,以及结果尚未提交给下游的消息加入到状态中。在恢复的时候,从状态总取出这些消息,再重新处理一遍。为了保证 exactly-once 特性,对于异步调用已经完成,且结果已经由 emitter 提交给下游的消息就无需保存在快照中。
public class AsyncWaitOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, OperatorActions {
/** Recovered input stream elements. */
private transient ListState<StreamElement> recoveredStreamElements;
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
recoveredStreamElements = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
}
@Override
public void open() throws Exception {
super.open();
//......
// 状态恢复的时候,从状态中取出所有为完成的消息,重新处理一遍
if (recoveredStreamElements != null) {
for (StreamElement element : recoveredStreamElements.get()) {
if (element.isRecord()) {
processElement(element.<IN>asRecord());
}
else if (element.isWatermark()) {
processWatermark(element.asWatermark());
}
else if (element.isLatencyMarker()) {
processLatencyMarker(element.asLatencyMarker());
}
else {
throw new IllegalStateException("Unknown record type " + element.getClass() +
" encountered while opening the operator.");
}
}
recoveredStreamElements = null;
}
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
//先清除状态
ListState<StreamElement> partitionableState =
getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
partitionableState.clear();
//将所有未完成处理请求对应的消息加入状态中
Collection<StreamElementQueueEntry<?>> values = queue.values();
try {
for (StreamElementQueueEntry<?> value : values) {
partitionableState.add(value.getStreamElement());
}
// add the pending stream element queue entry if the stream element queue is currently full
if (pendingStreamElementQueueEntry != null) {
partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
}
} catch (Exception e) {
partitionableState.clear();
throw new Exception("Could not add stream element queue entries to operator state " +
"backend of operator " + getOperatorName() + '.', e);
}
}
}
网友评论