Flink 自定义source、sink 是如何起作用的

作者: shengjk1 | 来源:发表于2019-06-24 20:29 被阅读0次

    自从学会自定义source之后,一直都比较好奇,为什么我实现一个 *SourceFunction,我自己定义的代码就可以跟 Flink很好的整合在一起?
    下面以 RichParallelSourceFunction 为例,来具体看一下究竟是自定义 source 是如何执行的

    首先看一下 Flink中的抽象类 AbstractUdfStreamOperator,专门负责Rich*Function的 open 和close方法

    ......
    
        // flink 提供的 Rich*Function 系列算子的 open 和 close 方法被执行的地方
        @Override
        public void open() throws Exception {
            super.open();
    //关键性方法 负责执行我们重写的open方法
            FunctionUtils.openFunction(userFunction, new Configuration());
        }
    
    //关键性方法 负责执行我们重写的close方法
        @Override
        public void close() throws Exception {
            super.close();
            functionsClosed = true;
            FunctionUtils.closeFunction(userFunction);
        }
    ......
    

    再继续看一下StreamSource

    ......
    //生成上下文之后,接下来就是把上下文交给 SourceFunction 去执行,调用用户重写的run方法开始正式运行
                userFunction.run(ctx);
    
                // if we get here, then the user function either exited after being done (finite source)
                // or the function was canceled or stopped. For the finite source case, we should emit
                // a final watermark that indicates that we reached the end of event-time
                if (!isCanceledOrStopped()) {
                    ctx.emitWatermark(Watermark.MAX_WATERMARK);
                }
    ......
    
    //执行我们自己重写的 cancel 方法
    public void cancel() {
            // important: marking the source as stopped has to happen before the function is stopped.
            // the flag that tracks this status is volatile, so the memory model also guarantees
            // the happens-before relationship
            markCanceledOrStopped();
            userFunction.cancel();
    
            // the context may not be initialized if the source was never running.
            if (ctx != null) {
                ctx.close();
            }
        }
    ......
    

    自此为止,我们自定义source function 的 open、close、cancel、run方法就都可以正常的调用运行了,然后就可以源源不断的产生数据了。

    sink也是类似的。首先通过AbstractUdfStreamOperator类调用 open、close方法,然后还有 StreamSink调用 自定义中的 invoke 方法。

    相关文章

      网友评论

        本文标题:Flink 自定义source、sink 是如何起作用的

        本文链接:https://www.haomeiwen.com/subject/dqkaqctx.html