美文网首页Flink
聊聊flink的Broadcast State

聊聊flink的Broadcast State

作者: go4it | 来源:发表于2018-12-26 17:55 被阅读79次

    本文主要研究一下flink的Broadcast State

    实例

        @Test
        public void testBroadcastState() throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> originStream = env.addSource(new RandomWordSource());
    
            MapStateDescriptor<String, String> descriptor = new MapStateDescriptor("dynamicConfig", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
            BroadcastStream<Tuple2<String,String>> configStream = env.addSource(new DynamicConfigSource()).broadcast(descriptor);
    
            BroadcastConnectedStream<String, Tuple2<String,String>> connectStream = originStream.connect(configStream);
            connectStream.process(new BroadcastProcessFunction<String, Tuple2<String,String>, Void>() {
                @Override
                public void processElement(String value, ReadOnlyContext ctx, Collector<Void> out) throws Exception {
                    ReadOnlyBroadcastState<String,String> config = ctx.getBroadcastState(descriptor);
                    String configValue = config.get("demoConfigKey");
                    //do some process base on the config
                    LOGGER.info("process value:{},config:{}",value,configValue);
                }
    
                @Override
                public void processBroadcastElement(Tuple2<String, String> value, Context ctx, Collector<Void> out) throws Exception {
                    LOGGER.info("receive config item:{}",value);
                    //update state
                    ctx.getBroadcastState(descriptor).put(value.getField(0),value.getField(1));
                }
            });
    
            env.execute("testBroadcastState");
        }
    
    public class DynamicConfigSource implements SourceFunction<Tuple2<String,String>> {
    
        private volatile boolean isRunning = true;
    
        @Override
        public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
            long idx = 1;
            while (isRunning){
                ctx.collect(Tuple2.of("demoConfigKey","value" + idx));
                idx++;
                TimeUnit.SECONDS.sleep(10);
            }
        }
    
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
    
    • 这里模拟了一个配置的source,定时去刷新配置,然后broadcast到每个task

    MapStateDescriptor

    flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/MapStateDescriptor.java

    @PublicEvolving
    public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>, Map<UK, UV>> {
    
        private static final long serialVersionUID = 1L;
    
        /**
         * Create a new {@code MapStateDescriptor} with the given name and the given type serializers.
         *
         * @param name The name of the {@code MapStateDescriptor}.
         * @param keySerializer The type serializer for the keys in the state.
         * @param valueSerializer The type serializer for the values in the state.
         */
        public MapStateDescriptor(String name, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) {
            super(name, new MapSerializer<>(keySerializer, valueSerializer), null);
        }
    
        /**
         * Create a new {@code MapStateDescriptor} with the given name and the given type information.
         *
         * @param name The name of the {@code MapStateDescriptor}.
         * @param keyTypeInfo The type information for the keys in the state.
         * @param valueTypeInfo The type information for the values in the state.
         */
        public MapStateDescriptor(String name, TypeInformation<UK> keyTypeInfo, TypeInformation<UV> valueTypeInfo) {
            super(name, new MapTypeInfo<>(keyTypeInfo, valueTypeInfo), null);
        }
    
        /**
         * Create a new {@code MapStateDescriptor} with the given name and the given type information.
         *
         * <p>If this constructor fails (because it is not possible to describe the type via a class),
         * consider using the {@link #MapStateDescriptor(String, TypeInformation, TypeInformation)} constructor.
         *
         * @param name The name of the {@code MapStateDescriptor}.
         * @param keyClass The class of the type of keys in the state.
         * @param valueClass The class of the type of values in the state.
         */
        public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> valueClass) {
            super(name, new MapTypeInfo<>(keyClass, valueClass), null);
        }
    
        @Override
        public Type getType() {
            return Type.MAP;
        }
    
        /**
         * Gets the serializer for the keys in the state.
         *
         * @return The serializer for the keys in the state.
         */
        public TypeSerializer<UK> getKeySerializer() {
            final TypeSerializer<Map<UK, UV>> rawSerializer = getSerializer();
            if (!(rawSerializer instanceof MapSerializer)) {
                throw new IllegalStateException("Unexpected serializer type.");
            }
    
            return ((MapSerializer<UK, UV>) rawSerializer).getKeySerializer();
        }
    
        /**
         * Gets the serializer for the values in the state.
         *
         * @return The serializer for the values in the state.
         */
        public TypeSerializer<UV> getValueSerializer() {
            final TypeSerializer<Map<UK, UV>> rawSerializer = getSerializer();
            if (!(rawSerializer instanceof MapSerializer)) {
                throw new IllegalStateException("Unexpected serializer type.");
            }
    
            return ((MapSerializer<UK, UV>) rawSerializer).getValueSerializer();
        }
    }
    
    • MapStateDescriptor继承了StateDescriptor,其中state为MapState类型,value为Map类型

    DataStream.broadcast

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

        /**
         * Sets the partitioning of the {@link DataStream} so that the output elements
         * are broadcasted to every parallel instance of the next operation. In addition,
         * it implicitly as many {@link org.apache.flink.api.common.state.BroadcastState broadcast states}
         * as the specified descriptors which can be used to store the element of the stream.
         *
         * @param broadcastStateDescriptors the descriptors of the broadcast states to create.
         * @return A {@link BroadcastStream} which can be used in the {@link #connect(BroadcastStream)} to
         * create a {@link BroadcastConnectedStream} for further processing of the elements.
         */
        @PublicEvolving
        public BroadcastStream<T> broadcast(final MapStateDescriptor<?, ?>... broadcastStateDescriptors) {
            Preconditions.checkNotNull(broadcastStateDescriptors);
            final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>());
            return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors);
        }
    
        /**
         * Internal function for setting the partitioner for the DataStream.
         *
         * @param partitioner
         *            Partitioner to set.
         * @return The modified DataStream.
         */
        protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
            return new DataStream<>(this.getExecutionEnvironment(), new PartitionTransformation<>(this.getTransformation(), partitioner));
        }
    
        /**
         * Sets the partitioning of the {@link DataStream} so that the output elements
         * are broadcast to every parallel instance of the next operation.
         *
         * @return The DataStream with broadcast partitioning set.
         */
        public DataStream<T> broadcast() {
            return setConnectionType(new BroadcastPartitioner<T>());
        }
    
    • DataStream的broadcast方法,首先调用setConnectionType,然后使用MapStateDescriptor作为参数创建BroadcastStream返回;DataStream也有一个无参的broadcast方法,它直接调用setConnectionType返回DataStream

    DataStream.connect

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

        /**
         * Creates a new {@link ConnectedStreams} by connecting
         * {@link DataStream} outputs of (possible) different types with each other.
         * The DataStreams connected using this operator can be used with
         * CoFunctions to apply joint transformations.
         *
         * @param dataStream
         *            The DataStream with which this stream will be connected.
         * @return The {@link ConnectedStreams}.
         */
        public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
            return new ConnectedStreams<>(environment, this, dataStream);
        }
    
        /**
         * Creates a new {@link BroadcastConnectedStream} by connecting the current
         * {@link DataStream} or {@link KeyedStream} with a {@link BroadcastStream}.
         *
         * <p>The latter can be created using the {@link #broadcast(MapStateDescriptor[])} method.
         *
         * <p>The resulting stream can be further processed using the {@code BroadcastConnectedStream.process(MyFunction)}
         * method, where {@code MyFunction} can be either a
         * {@link org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction KeyedBroadcastProcessFunction}
         * or a {@link org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction BroadcastProcessFunction}
         * depending on the current stream being a {@link KeyedStream} or not.
         *
         * @param broadcastStream The broadcast stream with the broadcast state to be connected with this stream.
         * @return The {@link BroadcastConnectedStream}.
         */
        @PublicEvolving
        public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
            return new BroadcastConnectedStream<>(
                    environment,
                    this,
                    Preconditions.checkNotNull(broadcastStream),
                    broadcastStream.getBroadcastStateDescriptor());
        }
    
    • DataStream的connect方法参数可以是DataStream类型,也可以是BroadcastStream类型,如果是BroadcastStream类型则返回的是BroadcastConnectedStream,否则是普通的ConnectedStreams

    BroadcastConnectedStream.process

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java

    @PublicEvolving
    public class BroadcastConnectedStream<IN1, IN2> {
    
        private final StreamExecutionEnvironment environment;
        private final DataStream<IN1> inputStream1;
        private final BroadcastStream<IN2> inputStream2;
        private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
    
        protected BroadcastConnectedStream(
                final StreamExecutionEnvironment env,
                final DataStream<IN1> input1,
                final BroadcastStream<IN2> input2,
                final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors) {
            this.environment = requireNonNull(env);
            this.inputStream1 = requireNonNull(input1);
            this.inputStream2 = requireNonNull(input2);
            this.broadcastStateDescriptors = requireNonNull(broadcastStateDescriptors);
        }
    
        public StreamExecutionEnvironment getExecutionEnvironment() {
            return environment;
        }
    
        /**
         * Returns the non-broadcast {@link DataStream}.
         *
         * @return The stream which, by convention, is not broadcasted.
         */
        public DataStream<IN1> getFirstInput() {
            return inputStream1;
        }
    
        /**
         * Returns the {@link BroadcastStream}.
         *
         * @return The stream which, by convention, is the broadcast one.
         */
        public BroadcastStream<IN2> getSecondInput() {
            return inputStream2;
        }
    
        /**
         * Gets the type of the first input.
         *
         * @return The type of the first input
         */
        public TypeInformation<IN1> getType1() {
            return inputStream1.getType();
        }
    
        /**
         * Gets the type of the second input.
         *
         * @return The type of the second input
         */
        public TypeInformation<IN2> getType2() {
            return inputStream2.getType();
        }
    
        /**
         * Assumes as inputs a {@link BroadcastStream} and a {@link KeyedStream} and applies the given
         * {@link KeyedBroadcastProcessFunction} on them, thereby creating a transformed output stream.
         *
         * @param function The {@link KeyedBroadcastProcessFunction} that is called for each element in the stream.
         * @param <KS> The type of the keys in the keyed stream.
         * @param <OUT> The type of the output elements.
         * @return The transformed {@link DataStream}.
         */
        @PublicEvolving
        public <KS, OUT> SingleOutputStreamOperator<OUT> process(final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function) {
    
            TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
                    function,
                    KeyedBroadcastProcessFunction.class,
                    1,
                    2,
                    3,
                    TypeExtractor.NO_INDEX,
                    getType1(),
                    getType2(),
                    Utils.getCallLocationName(),
                    true);
    
            return process(function, outTypeInfo);
        }
    
        /**
         * Assumes as inputs a {@link BroadcastStream} and a {@link KeyedStream} and applies the given
         * {@link KeyedBroadcastProcessFunction} on them, thereby creating a transformed output stream.
         *
         * @param function The {@link KeyedBroadcastProcessFunction} that is called for each element in the stream.
         * @param outTypeInfo The type of the output elements.
         * @param <KS> The type of the keys in the keyed stream.
         * @param <OUT> The type of the output elements.
         * @return The transformed {@link DataStream}.
         */
        @PublicEvolving
        public <KS, OUT> SingleOutputStreamOperator<OUT> process(
                final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
                final TypeInformation<OUT> outTypeInfo) {
    
            Preconditions.checkNotNull(function);
            Preconditions.checkArgument(inputStream1 instanceof KeyedStream,
                    "A KeyedBroadcastProcessFunction can only be used on a keyed stream.");
    
            TwoInputStreamOperator<IN1, IN2, OUT> operator =
                    new CoBroadcastWithKeyedOperator<>(clean(function), broadcastStateDescriptors);
            return transform("Co-Process-Broadcast-Keyed", outTypeInfo, operator);
        }
    
        /**
         * Assumes as inputs a {@link BroadcastStream} and a non-keyed {@link DataStream} and applies the given
         * {@link BroadcastProcessFunction} on them, thereby creating a transformed output stream.
         *
         * @param function The {@link BroadcastProcessFunction} that is called for each element in the stream.
         * @param <OUT> The type of the output elements.
         * @return The transformed {@link DataStream}.
         */
        @PublicEvolving
        public <OUT> SingleOutputStreamOperator<OUT> process(final BroadcastProcessFunction<IN1, IN2, OUT> function) {
    
            TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
                    function,
                    BroadcastProcessFunction.class,
                    0,
                    1,
                    2,
                    TypeExtractor.NO_INDEX,
                    getType1(),
                    getType2(),
                    Utils.getCallLocationName(),
                    true);
    
            return process(function, outTypeInfo);
        }
    
        /**
         * Assumes as inputs a {@link BroadcastStream} and a non-keyed {@link DataStream} and applies the given
         * {@link BroadcastProcessFunction} on them, thereby creating a transformed output stream.
         *
         * @param function The {@link BroadcastProcessFunction} that is called for each element in the stream.
         * @param outTypeInfo The type of the output elements.
         * @param <OUT> The type of the output elements.
         * @return The transformed {@link DataStream}.
         */
        @PublicEvolving
        public <OUT> SingleOutputStreamOperator<OUT> process(
                final BroadcastProcessFunction<IN1, IN2, OUT> function,
                final TypeInformation<OUT> outTypeInfo) {
    
            Preconditions.checkNotNull(function);
            Preconditions.checkArgument(!(inputStream1 instanceof KeyedStream),
                    "A BroadcastProcessFunction can only be used on a non-keyed stream.");
    
            TwoInputStreamOperator<IN1, IN2, OUT> operator =
                    new CoBroadcastWithNonKeyedOperator<>(clean(function), broadcastStateDescriptors);
            return transform("Co-Process-Broadcast", outTypeInfo, operator);
        }
    
        @Internal
        private <OUT> SingleOutputStreamOperator<OUT> transform(
                final String functionName,
                final TypeInformation<OUT> outTypeInfo,
                final TwoInputStreamOperator<IN1, IN2, OUT> operator) {
    
            // read the output type of the input Transforms to coax out errors about MissingTypeInfo
            inputStream1.getType();
            inputStream2.getType();
    
            TwoInputTransformation<IN1, IN2, OUT> transform = new TwoInputTransformation<>(
                    inputStream1.getTransformation(),
                    inputStream2.getTransformation(),
                    functionName,
                    operator,
                    outTypeInfo,
                    environment.getParallelism());
    
            if (inputStream1 instanceof KeyedStream) {
                KeyedStream<IN1, ?> keyedInput1 = (KeyedStream<IN1, ?>) inputStream1;
                TypeInformation<?> keyType1 = keyedInput1.getKeyType();
                transform.setStateKeySelectors(keyedInput1.getKeySelector(), null);
                transform.setStateKeyType(keyType1);
            }
    
            @SuppressWarnings({ "unchecked", "rawtypes" })
            SingleOutputStreamOperator<OUT> returnStream = new SingleOutputStreamOperator(environment, transform);
    
            getExecutionEnvironment().addOperator(transform);
    
            return returnStream;
        }
    
        protected <F> F clean(F f) {
            return getExecutionEnvironment().clean(f);
        }
    }
    
    • BroadcastConnectedStream.process接收两种类型的function,一种是KeyedBroadcastProcessFunction,另外一种是BroadcastProcessFunction;它们都定义了processElement、processBroadcastElement抽象方法,只是KeyedBroadcastProcessFunction多定义了一个onTimer方法,默认是空操作,允许子类重写

    小结

    • 对于broadcast的使用有几个步骤,1是建立MapStateDescriptor,然后通过DataStream.broadcast方法返回BroadcastStream;2是需要接受broadcast的stream通过DataStream.connect方法跟BroadcastStream进行连接返回BroadcastConnectedStream;3是通过BroadcastConnectedStream.process方法进行processElement及processBroadcastElement处理
    • BroadcastConnectedStream.process接收两种类型的function,一种是KeyedBroadcastProcessFunction,另外一种是BroadcastProcessFunction;它们都定义了processElement、processBroadcastElement抽象方法,只是KeyedBroadcastProcessFunction多定义了一个onTimer方法,默认是空操作,允许子类重写
    • Broadcast State为map format,它会将state广播到每个task,注意该state并不会跨task传播,对其修改,仅仅是作用在其所在的task;downstream tasks接收到broadcast event的顺序可能不一样,所以依赖其到达顺序来处理element的时候要小心;checkpoint的时候也会checkpoint broadcast state;另外就是Broadcast State只在内存有,没有RocksDB state backend

    doc

    相关文章

      网友评论

        本文标题:聊聊flink的Broadcast State

        本文链接:https://www.haomeiwen.com/subject/gjhklqtx.html