美文网首页flink
Flink学习笔记之七AsycIO

Flink学习笔记之七AsycIO

作者: 天火燎原_e548 | 来源:发表于2019-03-26 18:54 被阅读4次

    1.什么是异步IO

    ,当请求外部系统或者耗时操作,需要异步IO


    屏幕快照 2019-03-21 下午3.01.01.png

    2.AsyncDataStream

    屏幕快照 2019-03-21 下午3.02.27.png
    private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
    private static final long serialVersionUID = 2098635244857937717L;
    
    private transient ExecutorService executorService;
    
    /**
     * The result of multiplying sleepFactor with a random float is used to pause
     * the working thread in the thread pool, simulating a time consuming async operation.
     */
    private final long sleepFactor;
    
    /**
     * The ratio to generate an exception to simulate an async error. For example, the error
     * may be a TimeoutException while visiting HBase.
     */
    private final float failRatio;
    
    private final long shutdownWaitTS;
    
    SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) {
        this.sleepFactor = sleepFactor;
        this.failRatio = failRatio;
        this.shutdownWaitTS = shutdownWaitTS;
    }
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    
        executorService = Executors.newFixedThreadPool(30);
    }
    
    @Override
    public void close() throws Exception {
        super.close();
        ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService);
    }
    
    @Override
    public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) {
        // 没有连接数据库查询,所以模拟异步操作
        executorService.submit(() -> {
            // wait for while to simulate async operation here
            long sleep = (long) (ThreadLocalRandom.current().nextFloat() * sleepFactor);
            try {
                Thread.sleep(sleep);
    
                if (ThreadLocalRandom.current().nextFloat() < failRatio) {
                    resultFuture.completeExceptionally(new Exception("wahahahaha..."));
                } else {
                    resultFuture.complete(
                            Collections.singletonList("key-" + (input % 10)));
                }
            } catch (InterruptedException e) {
                resultFuture.complete(new ArrayList<>(0));
            }
        });
    }
    }
    

    3.实现原理

    屏幕快照 2019-03-21 下午3.04.41.png

    有序


    屏幕快照 2019-03-21 下午3.05.07.png

    processing time无序


    屏幕快照 2019-03-21 下午3.05.15.png
    event time无序
    屏幕快照 2019-03-21 下午3.05.25.png

    4.快照恢复

    屏幕快照 2019-03-21 下午3.07.38.png

    相关文章

      网友评论

        本文标题:Flink学习笔记之七AsycIO

        本文链接:https://www.haomeiwen.com/subject/dvvnvqtx.html