美文网首页flink
Flink广播流、广播状态原理简析

Flink广播流、广播状态原理简析

作者: LittleMagic | 来源:发表于2020-07-16 22:57 被阅读0次

    Prologue

    在很久之前的《Spark Streaming/Flink广播实现作业配置动态更新》一文中,笔者简单介绍了Flink Streaming API中广播流和广播状态的使用方法。前几天见到社区群内有童鞋询问与广播状态相关的事情,于是写一篇深入一点的文章说说它。

    Broadcast[Connected]Stream

    拿之前的示意图复习一下。其中Stream A是普通的数据流,Stream B是含有控制信息等数据的控制流(control stream),且B会被广播。

    在流B上调用DataStream.broadcast()方法并传入MapStateDescriptor作为状态描述符,就可以将它转化为广播流BroadcastStream。该方法的源码如下,注意MapStateDescriptor可以有多个。

    public BroadcastStream<T> broadcast(final MapStateDescriptor<?, ?>... broadcastStateDescriptors) {
        Preconditions.checkNotNull(broadcastStateDescriptors);
        final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>());
        return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors);
    }
    

    BroadcastPartitioner是专用于广播流的分区器。因为广播的具体逻辑要在后面靠connect()方法实现,所以实际上不用分区(即selectChannel()方法为空),只是简单地标记了isBroadcast为true而已。

    broadcast()方法将执行环境StreamExecutionEnvironment、原始的DataStream和MapStateDescriptor一起包装成了BroadcastStream实例。BroadcastStream的实现非常简单,代码就不贴了。

    接下来我们会在数据流A上调用DataStream.connect()方法,将它与被广播的流B连接起来,并生成一个BroadcastConnectedStream。

    public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
        return new BroadcastConnectedStream<>(
                environment,
                this,
                Preconditions.checkNotNull(broadcastStream),
                broadcastStream.getBroadcastStateDescriptor());
    }
    

    然后就要在BroadcastConnectedStream上调用process()方法来分别处理两条流。我们知道,如果数据流A是一个KeyedStream,就要传入KeyedBroadcastProcessFunction;如果是一个普通的DataStream,就要传入BroadcastProcessFunction。下面以KeyedStream的情况为例,查看process()方法的源码。

    public <KS, OUT> SingleOutputStreamOperator<OUT> process(
            final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
            final TypeInformation<OUT> outTypeInfo) {
        Preconditions.checkNotNull(function);
        Preconditions.checkArgument(inputStream1 instanceof KeyedStream,
                "A KeyedBroadcastProcessFunction can only be used on a keyed stream.");
    
        TwoInputStreamOperator<IN1, IN2, OUT> operator =
                new CoBroadcastWithKeyedOperator<>(clean(function), broadcastStateDescriptors);
        return transform("Co-Process-Broadcast-Keyed", outTypeInfo, operator);
    }
    

    CoBroadcastWith[Non]KeyedOperator

    由上可见是通过构建CoBroadcastWithKeyedOperator这个算子来真正调用处理函数(另一种情况的算子名为CoBroadcastWithNonKeyedOperator),属于双流输入的TwoInputStreamOperator类别。该算子内维护的成员有如下几个。

    private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
    private transient TimestampedCollector<OUT> collector;
    private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates;
    private transient ReadWriteContextImpl rwContext;
    private transient ReadOnlyContextImpl rContext;
    private transient OnTimerContextImpl onTimerContext;
    

    其中,broadcastStateDescriptors就是文章开头通过broadcast()方法传入的状态描述符列表,而broadcastStates维护了状态描述符与状态实例之间的映射关系。另外,ReadWriteContextImpl和ReadOnlyContextImpl分别对应KeyedBroadcastProcessFunction的可读可写上下文和只读上下文,后面会看到它们的作用。

    在算子的open()方法中可以看到初始化逻辑。

    public void open() throws Exception {
        super.open();
    
        InternalTimerService<VoidNamespace> internalTimerService =
                getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
        TimerService timerService = new SimpleTimerService(internalTimerService);
    
        collector = new TimestampedCollector<>(output);
    
        this.broadcastStates = new HashMap<>(broadcastStateDescriptors.size());
        for (MapStateDescriptor<?, ?> descriptor: broadcastStateDescriptors) {
            broadcastStates.put(descriptor, getOperatorStateBackend().getBroadcastState(descriptor));
        }
    
        rwContext = new ReadWriteContextImpl(getExecutionConfig(), getKeyedStateBackend(), userFunction, broadcastStates, timerService);
        rContext = new ReadOnlyContextImpl(getExecutionConfig(), userFunction, broadcastStates, timerService);
        onTimerContext = new OnTimerContextImpl(getExecutionConfig(), userFunction, broadcastStates, timerService);
    }
    

    分别观察两条流的处理方法processElement1/2(),1对应数据流,2对应广播流。

    @Override
    public void processElement1(StreamRecord<IN1> element) throws Exception {
        collector.setTimestamp(element);
        rContext.setElement(element);
        userFunction.processElement(element.getValue(), rContext, collector);
        rContext.setElement(null);
    }
    
    @Override
    public void processElement2(StreamRecord<IN2> element) throws Exception {
        collector.setTimestamp(element);
        rwContext.setElement(element);
        userFunction.processBroadcastElement(element.getValue(), rwContext, collector);
        rwContext.setElement(null);
    }
    

    可以发现,处理数据流的processElement()方法对应的上下文为ReadOnlyContext,而处理广播流的processBroadcastElement()方法对应的上下文为ReadWriteContext。我们已经知道,在上述两个方法中都可以调用Context.getBroadcastState()方法来获取广播状态BroadcastState,它也是两条流之间的桥梁。那么getBroadcastState()有什么不同呢?

    // 这是广播流对应的ReadWriteContextImpl.getBroadcastState()方法
    @Override
    public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
        Preconditions.checkNotNull(stateDescriptor);
        stateDescriptor.initializeSerializerUnlessSet(config);
        BroadcastState<K, V> state = (BroadcastState<K, V>) states.get(stateDescriptor);
        if (state == null) {
            throw new IllegalArgumentException("The requested state does not exist. " +
                    "Check for typos in your state descriptor, or specify the state descriptor " +
                    "in the datastream.broadcast(...) call if you forgot to register it.");
        }
        return state;
    }
    
    // 这是数据流对应的ReadOnlyContextImpl.getBroadcastState()方法
    @Override
    public  <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
        Preconditions.checkNotNull(stateDescriptor);
        stateDescriptor.initializeSerializerUnlessSet(config);
        ReadOnlyBroadcastState<K, V> state = (ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor);
        if (state == null) {
            throw new IllegalArgumentException("The requested state does not exist. " +
                    "Check for typos in your state descriptor, or specify the state descriptor " +
                    "in the datastream.broadcast(...) call if you forgot to register it.");
        }
        return state;
    }
    

    可见仅有返回状态实例的类型不同,分别是BroadcastState和ReadOnlyBroadcastState。顾名思义,数据流一侧只能读取BroadcastState,广播流一侧可以读写BroadcastState,这样可以有效防止处理数据流时更改状态值造成结果不一致。

    最后来看看BroadcastState的实现吧。

    [ReadOnly]BroadcastState

    类图如下。

    可见,只读的和可读写的广播状态的最终实现都是HeapBroadcastState,不过ReadOnlyBroadcastState接口中没有提供put()/putAll()/remove()方法而已。

    在上一节算子的open()方法中,调用了DefaultOperatorStateBackend.getBroadcastState()方法来创建HeapBroadcastState实例——说明广播状态本质上是一种operator state。HeapBroadcastState的实现甚为简单,主要的属性只有两个,一是广播状态的元数据(包含名称、序列化器等),二是真正存储状态数据的HashMap。

    private RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo;
    private final Map<K, V> backingMap;
    

    至于所有的状态读写操作,实际上都是对underlying HashMap的读写操作。

    @Override
    public V get(K key) { return backingMap.get(key); }
    
    @Override
    public void put(K key, V value) { backingMap.put(key, value); }
    
    @Override
    public void putAll(Map<K, V> map) { backingMap.putAll(map); }
    
    @Override
    public void remove(K key) { backingMap.remove(key); }
    
    @Override
    public boolean contains(K key) { return backingMap.containsKey(key); }
    

    由此可知,广播状态是固定维护在堆内存中的,不会写入文件系统或者RocksDB。广播流一侧修改广播状态的键值之后,数据流一侧就可以立即感知到变化。

    The End

    相关文章

      网友评论

        本文标题:Flink广播流、广播状态原理简析

        本文链接:https://www.haomeiwen.com/subject/ldmxhktx.html