美文网首页实时数据相关flink
Flink使用广播实现配置动态更新

Flink使用广播实现配置动态更新

作者: 猫留下你走吧 | 来源:发表于2019-04-01 17:15 被阅读97次

    本着开源的精神将学习成果分享,转载请注明出处。

    问题复现

    场景

    对每条流数据进行关键字检测,对符合条件的消息进行拦截。例如关键字是 java,则消息 java是世界上最优秀的语言就会被拦截。

    需求

    拦截的关键字不一定是 java,可能需要变更拦截关键词,例如: php。因此关键字必须做到是可配置的。

    问题

    我们首先想到的是存在数据库或外部传入参数。但又因为该关键词是在算子中作为一个变量,一旦作业启动,想修改关键字不得不停掉作业,然后再重新启动作业。繁琐且不友好,有没有什么可以动态的修改算子里变量的方法?

    解决方案

    没错,使用广播的方式去解决。画了2张图表示了他们的区别


    算子处理广播流方式 算子处理数据流方式

    广播和普通的流数据不同的是:广播流的1条流数据能够被算子的所有分区所处理,而数据流的1条流数据只能够被算子的某一分区处理。因此广播流的特点也决定适合做配置的动态更新

    源码

    package com.example.flink;
    
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.BroadcastStream;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    import org.apache.flink.util.Collector;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 作者(author):miao
     * 时间(date): 2019-04-01 15:17
     * 功能描述(description):使用广播流实现配置的动态更新
     */
    public class BroadcastStreamDemo {
    
        public static void main(String[] args) throws Exception {
    
            // 构建流处理环境
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 配置处理环境的并发度为4
            environment.setParallelism(4);
    
            final MapStateDescriptor<String, String> CONFIG_KEYWORDS = new MapStateDescriptor<>(
                    "config-keywords",
                    BasicTypeInfo.STRING_TYPE_INFO,
                    BasicTypeInfo.STRING_TYPE_INFO);
    
            // 自定义广播流(单例)
            BroadcastStream<String> broadcastStream = environment.addSource(new RichSourceFunction<String>() {
    
                private volatile boolean isRunning = true;
                //测试数据集
                private String[] dataSet = new String[] {
                        "java",
                        "swift",
                        "php",
                        "go",
                        "python"
                };
    
                /**
                 * 数据源:模拟每30秒随机更新一次拦截的关键字
                 * @param ctx
                 * @throws Exception
                 */
                @Override
                public void run(SourceContext<String> ctx) throws Exception {
                    int size = dataSet.length;
                    while (isRunning) {
                        TimeUnit.SECONDS.sleep(30);
                        int seed = (int) (Math.random() * size);
                        //随机选择关键字发送
                        ctx.collect(dataSet[seed]);
                        System.out.println("读取到上游发送的关键字:" + dataSet[seed]);
                    }
                }
    
                @Override
                public void cancel() {
                    isRunning = false;
                }
            }).setParallelism(1).broadcast(CONFIG_KEYWORDS);
    
            // 自定义数据流(单例)
            DataStream<String> dataStream = environment.addSource(new RichSourceFunction<String>() {
    
                private volatile boolean isRunning = true;
    
                //测试数据集
                private String[] dataSet = new String[] {
                        "java是世界上最优秀的语言",
                        "swift是世界上最优秀的语言",
                        "php是世界上最优秀的语言",
                        "go是世界上最优秀的语言",
                        "python是世界上最优秀的语言"
                };
    
                /**
                 * 模拟每3秒随机产生1条消息
                 * @param ctx
                 * @throws Exception
                 */
                @Override
                public void run(SourceContext<String> ctx) throws Exception {
                    int size = dataSet.length;
                    while (isRunning) {
                        TimeUnit.SECONDS.sleep(3);
                        int seed = (int) (Math.random() * size);
                        ctx.collect(dataSet[seed]);
                        System.out.println("读取到上游发送的消息:" + dataSet[seed]);
                    }
                }
    
                @Override
                public void cancel() {
                    isRunning = false;
                }
    
            }).setParallelism(1);
    
            // 数据流和广播流连接处理并将拦截结果打印
            dataStream.connect(broadcastStream).process(new BroadcastProcessFunction<String, String, String>() {
    
                //拦截的关键字
                private String keywords = null;
    
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    keywords = "java";
                    System.out.println("初始化模拟连接数据库读取拦截关键字:java");
                }
    
                @Override
                public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                    if (value.contains(keywords)) {
                        out.collect("拦截消息:" + value + ", 原因:包含拦截关键字:" + keywords);
                    }
                }
    
                @Override
                public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
                    keywords = value;
                    System.out.println("关键字更新成功,更新拦截关键字:" + value);
                }
            }).print();
    
            // 懒加载执行
            environment.execute();
        }
    
    }
    
    

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.example.flink</groupId>
        <artifactId>broadcast-stream-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <flink.version>1.7.1</flink.version>
            <java.version>1.8</java.version>
            <scala.version>2.11</scala.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-nop</artifactId>
                <version>1.7.15</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <!-- 指定jdk版本 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <!-- 源码的编译器版本 -->
                        <source>${java.version}</source>
                        <!-- class的编译器版本 -->
                        <target>${java.version}</target>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.4.1</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <transformers>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>com.example.flink.BroadcastStreamDemo</mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    运行结果

    初始化模拟连接数据库读取拦截关键字:java
    初始化模拟连接数据库读取拦截关键字:java
    初始化模拟连接数据库读取拦截关键字:java
    初始化模拟连接数据库读取拦截关键字:java
    读取到上游发送的消息:java是世界上最优秀的语言
    2> 拦截消息:java是世界上最优秀的语言, 原因:包含拦截关键字:java
    读取到上游发送的消息:go是世界上最优秀的语言
    读取到上游发送的消息:php是世界上最优秀的语言
    读取到上游发送的消息:php是世界上最优秀的语言
    读取到上游发送的消息:swift是世界上最优秀的语言
    读取到上游发送的消息:php是世界上最优秀的语言
    读取到上游发送的消息:java是世界上最优秀的语言
    4> 拦截消息:java是世界上最优秀的语言, 原因:包含拦截关键字:java
    读取到上游发送的消息:python是世界上最优秀的语言
    读取到上游发送的消息:java是世界上最优秀的语言
    2> 拦截消息:java是世界上最优秀的语言, 原因:包含拦截关键字:java
    读取到上游发送的关键字:php
    读取到上游发送的消息:php是世界上最优秀的语言
    关键字更新成功,更新拦截关键字:php
    关键字更新成功,更新拦截关键字:php
    关键字更新成功,更新拦截关键字:php
    关键字更新成功,更新拦截关键字:php
    3> 拦截消息:php是世界上最优秀的语言, 原因:包含拦截关键字:php
    读取到上游发送的消息:go是世界上最优秀的语言
    读取到上游发送的消息:go是世界上最优秀的语言
    读取到上游发送的消息:python是世界上最优秀的语言
    读取到上游发送的消息:php是世界上最优秀的语言
    3> 拦截消息:php是世界上最优秀的语言, 原因:包含拦截关键字:php
    读取到上游发送的消息:python是世界上最优秀的语言
    读取到上游发送的消息:python是世界上最优秀的语言
    读取到上游发送的消息:python是世界上最优秀的语言
    读取到上游发送的消息:go是世界上最优秀的语言
    读取到上游发送的消息:python是世界上最优秀的语言
    读取到上游发送的关键字:swift
    关键字更新成功,更新拦截关键字:swift
    关键字更新成功,更新拦截关键字:swift
    关键字更新成功,更新拦截关键字:swift
    关键字更新成功,更新拦截关键字:swift
    读取到上游发送的消息:python是世界上最优秀的语言
    读取到上游发送的消息:php是世界上最优秀的语言
    读取到上游发送的消息:go是世界上最优秀的语言
    读取到上游发送的消息:python是世界上最优秀的语言
    读取到上游发送的消息:php是世界上最优秀的语言
    读取到上游发送的消息:python是世界上最优秀的语言
    读取到上游发送的消息:java是世界上最优秀的语言
    读取到上游发送的消息:python是世界上最优秀的语言
    读取到上游发送的消息:swift是世界上最优秀的语言
    1> 拦截消息:swift是世界上最优秀的语言, 原因:包含拦截关键字:swift
    读取到上游发送的消息:python是世界上最优秀的语言
    

    参考资料

    聊聊flink的Broadcast State

    相关文章

      网友评论

        本文标题:Flink使用广播实现配置动态更新

        本文链接:https://www.haomeiwen.com/subject/jeyvbqtx.html