屏幕快照 2019-03-21 下午3.01.01.png
屏幕快照 2019-03-21 下午3.02.27.pngprivate static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
private static final long serialVersionUID = 2098635244857937717L;
private transient ExecutorService executorService;
* The result of multiplying sleepFactor with a random float is used to pause
* the working thread in the thread pool, simulating a time consuming async operation.
private final long sleepFactor;
* The ratio to generate an exception to simulate an async error. For example, the error
* may be a TimeoutException while visiting HBase.
private final float failRatio;
private final long shutdownWaitTS;
SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) {
this.sleepFactor = sleepFactor;
this.failRatio = failRatio;
this.shutdownWaitTS = shutdownWaitTS;
public void open(Configuration parameters) throws Exception {
executorService = Executors.newFixedThreadPool(30);
public void close() throws Exception {
ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService);
public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) {
// 没有连接数据库查询,所以模拟异步操作
executorService.submit(() -> {
// wait for while to simulate async operation here
long sleep = (long) (ThreadLocalRandom.current().nextFloat() * sleepFactor);
try {
if (ThreadLocalRandom.current().nextFloat() < failRatio) {
resultFuture.completeExceptionally(new Exception("wahahahaha..."));
} else {
Collections.singletonList("key-" + (input % 10)));
} catch (InterruptedException e) {
resultFuture.complete(new ArrayList<>(0));
屏幕快照 2019-03-21 下午3.04.41.png有序
屏幕快照 2019-03-21 下午3.05.07.png
processing time无序
屏幕快照 2019-03-21 下午3.05.15.png
event time无序
屏幕快照 2019-03-21 下午3.05.25.png