本着开源的精神将学习成果分享,转载请注明出处。
问题复现
场景
对每条流数据进行关键字检测,对符合条件的消息进行拦截。例如关键字是 java
,则消息 java是世界上最优秀的语言
就会被拦截。
需求
拦截的关键字不一定是 java
,可能需要变更拦截关键词,例如: php
。因此关键字必须做到是可配置的。
问题
我们首先想到的是存在数据库或外部传入参数。但又因为该关键词是在算子中作为一个变量,一旦作业启动,想修改关键字不得不停掉作业,然后再重新启动作业。繁琐且不友好,有没有什么可以动态的修改算子里变量的方法?
解决方案
没错,使用广播的方式去解决。画了2张图表示了他们的区别
算子处理广播流方式 算子处理数据流方式
广播和普通的流数据不同的是:广播流的1条流数据能够被算子的所有分区所处理,而数据流的1条流数据只能够被算子的某一分区处理。因此广播流的特点也决定适合做配置的动态更新
源码
package com.example.flink;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import java.util.concurrent.TimeUnit;
/**
* 作者(author):miao
* 时间(date): 2019-04-01 15:17
* 功能描述(description):使用广播流实现配置的动态更新
*/
public class BroadcastStreamDemo {
public static void main(String[] args) throws Exception {
// 构建流处理环境
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置处理环境的并发度为4
environment.setParallelism(4);
final MapStateDescriptor<String, String> CONFIG_KEYWORDS = new MapStateDescriptor<>(
"config-keywords",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
// 自定义广播流(单例)
BroadcastStream<String> broadcastStream = environment.addSource(new RichSourceFunction<String>() {
private volatile boolean isRunning = true;
//测试数据集
private String[] dataSet = new String[] {
"java",
"swift",
"php",
"go",
"python"
};
/**
* 数据源:模拟每30秒随机更新一次拦截的关键字
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<String> ctx) throws Exception {
int size = dataSet.length;
while (isRunning) {
TimeUnit.SECONDS.sleep(30);
int seed = (int) (Math.random() * size);
//随机选择关键字发送
ctx.collect(dataSet[seed]);
System.out.println("读取到上游发送的关键字:" + dataSet[seed]);
}
}
@Override
public void cancel() {
isRunning = false;
}
}).setParallelism(1).broadcast(CONFIG_KEYWORDS);
// 自定义数据流(单例)
DataStream<String> dataStream = environment.addSource(new RichSourceFunction<String>() {
private volatile boolean isRunning = true;
//测试数据集
private String[] dataSet = new String[] {
"java是世界上最优秀的语言",
"swift是世界上最优秀的语言",
"php是世界上最优秀的语言",
"go是世界上最优秀的语言",
"python是世界上最优秀的语言"
};
/**
* 模拟每3秒随机产生1条消息
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<String> ctx) throws Exception {
int size = dataSet.length;
while (isRunning) {
TimeUnit.SECONDS.sleep(3);
int seed = (int) (Math.random() * size);
ctx.collect(dataSet[seed]);
System.out.println("读取到上游发送的消息:" + dataSet[seed]);
}
}
@Override
public void cancel() {
isRunning = false;
}
}).setParallelism(1);
// 数据流和广播流连接处理并将拦截结果打印
dataStream.connect(broadcastStream).process(new BroadcastProcessFunction<String, String, String>() {
//拦截的关键字
private String keywords = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
keywords = "java";
System.out.println("初始化模拟连接数据库读取拦截关键字:java");
}
@Override
public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
if (value.contains(keywords)) {
out.collect("拦截消息:" + value + ", 原因:包含拦截关键字:" + keywords);
}
}
@Override
public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
keywords = value;
System.out.println("关键字更新成功,更新拦截关键字:" + value);
}
}).print();
// 懒加载执行
environment.execute();
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.flink</groupId>
<artifactId>broadcast-stream-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.7.1</flink.version>
<java.version>1.8</java.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.15</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 指定jdk版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<!-- 源码的编译器版本 -->
<source>${java.version}</source>
<!-- class的编译器版本 -->
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.flink.BroadcastStreamDemo</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
运行结果
初始化模拟连接数据库读取拦截关键字:java
初始化模拟连接数据库读取拦截关键字:java
初始化模拟连接数据库读取拦截关键字:java
初始化模拟连接数据库读取拦截关键字:java
读取到上游发送的消息:java是世界上最优秀的语言
2> 拦截消息:java是世界上最优秀的语言, 原因:包含拦截关键字:java
读取到上游发送的消息:go是世界上最优秀的语言
读取到上游发送的消息:php是世界上最优秀的语言
读取到上游发送的消息:php是世界上最优秀的语言
读取到上游发送的消息:swift是世界上最优秀的语言
读取到上游发送的消息:php是世界上最优秀的语言
读取到上游发送的消息:java是世界上最优秀的语言
4> 拦截消息:java是世界上最优秀的语言, 原因:包含拦截关键字:java
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的消息:java是世界上最优秀的语言
2> 拦截消息:java是世界上最优秀的语言, 原因:包含拦截关键字:java
读取到上游发送的关键字:php
读取到上游发送的消息:php是世界上最优秀的语言
关键字更新成功,更新拦截关键字:php
关键字更新成功,更新拦截关键字:php
关键字更新成功,更新拦截关键字:php
关键字更新成功,更新拦截关键字:php
3> 拦截消息:php是世界上最优秀的语言, 原因:包含拦截关键字:php
读取到上游发送的消息:go是世界上最优秀的语言
读取到上游发送的消息:go是世界上最优秀的语言
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的消息:php是世界上最优秀的语言
3> 拦截消息:php是世界上最优秀的语言, 原因:包含拦截关键字:php
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的消息:go是世界上最优秀的语言
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的关键字:swift
关键字更新成功,更新拦截关键字:swift
关键字更新成功,更新拦截关键字:swift
关键字更新成功,更新拦截关键字:swift
关键字更新成功,更新拦截关键字:swift
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的消息:php是世界上最优秀的语言
读取到上游发送的消息:go是世界上最优秀的语言
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的消息:php是世界上最优秀的语言
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的消息:java是世界上最优秀的语言
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的消息:swift是世界上最优秀的语言
1> 拦截消息:swift是世界上最优秀的语言, 原因:包含拦截关键字:swift
读取到上游发送的消息:python是世界上最优秀的语言
网友评论