前言
之前初步跑了flink官网的词频统计的demo。太初级了,不实用,只是带我进行了以便flink的基本操作。总计下来,我还需要试验这些操作:
- 从kafka读取数据作为数据输入
- 根据计算输出到另一个kafka主题中
- 将后面kafka主题中的内容存储到数据库。
根据这个目的,我设计了这次的demo
设计
计算场景设计
我的实际的主干场景是有一个上报数据的无限流,然后根据配置以及数据的内容,将数据转换为业务粒度的数据投入到后续的数据队列中。在这里,转换前和转换后的数据都是需要进行原样保存的。
这里有一些需要被测试实现的点:
- 从kafka的指定topic中读取数据流,topic名称:data-upload-stream
- 将数据进行计算后投入到新的流中,两个流的转换并不是一对一的。topic名称:transformed-data-stream
- 根据可以动态变更的配置进行计算
- 将数据存储到postgre中
- 如果可以,使用现成的connector或者自定义connector
实现步骤
由于这里存在不少没用过的东西,无论技术是否测试成功,相应的问题还是要解决的。所以,我们从最小的陌生技术集合开始引入,我们分这么几个测试步骤。
数据拉取和投递
该过程,主要测试核心的从kafka拉取数据,并向kafka投递数据。
- 数据发送模拟由一个springmvc项目进行模拟,调用一次接口发送一次数据,以此可控得模拟数据发送
- 从kafka中拉取数据,进行转换后(这里转换规则先写为固定的)投递到另一个主题中
持久化数据
分别从刚才两个主题中拉取数据,并持久化到postgre数据库中。虽然之后最终的持久化容器计划是hbase,但是这里先对postgre进行测试,等本次例子全部完成后再规划对hbase的相关探索。
动态调整配置
这里数据转换的规则是根据配置进行改变的,故这里的配置需要可以动态改变的。根据了解,这里是根据其支持的Broadcast State功能进行支持的。
实现
数据源投递端
这里因为需要和计算端共享数据结构和逻辑,我这里设计了两个项目:
- flink.common:公共项目,用来共享代码逻辑
- flink-mockdata:数据模拟项目,仅实现/mock/push接口
关于kafka的java操作这里就不细述了,因为最近系统性得学习了下kafka的使用关于kafka从调用到运维的细节还需要重新梳理一遍,这里就先完成功能了。
另外,需要创建两个主题,kafka操作命令如下:
bin/kafka-topics.sh --zookeeper 192.168.137.11:2181 --create --topic data-upload-stream --partitions 1 --replication-factor 1
bin/kafka-topics.sh --zookeeper 192.168.137.11:2181 --create --topic transformed-data-stream --partitions 1 --replication-factor 1
需要说明的有这么两点:
- 我使用的是kafka2.1版本,对于低于2.2版本的kafka在创建topic的时候需要指明的参数是zookeeper的地址,而大于等于2.2版本的则需要使用--bootstrap-server指明broker的地址
- 在创建kafka的topic的时候如果使用下划线(_)或者点号(.)作连接符的话则会弹出一个警告信息,即这两个符号相冲突,不可以同时使用。所以这里用减号作连接符。
代码投递完,使用如下命令在服务端消费并查看相关的消息:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.137.11:9092 --topic data-upload-stream --from-beginning
发送者的主要代码我粘贴如下,仅供demo使用,详细参数设定尚未调试:
/**
* 初始化kafka模块,初始化之后消费者才会生效
*/
public void init(){
//创建生产者
Properties props = new Properties();
props.put("bootstrap.servers",abstractKafkaConfig.getBootstrapServers() );
props.put("acks", "all");
props.put("delivery.timeout.ms", abstractKafkaConfig.getDeleveryTimeoutMS());
props.put("request.timeout.ms",abstractKafkaConfig.getRequestTimeoutMS());
props.put("batch.size", 16384);
props.put("linger.ms", abstractKafkaConfig.getLingerMS()); //发送的延迟时间
props.put("buffer.memory", abstractKafkaConfig.getBufferMemory());
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
this.producer = new KafkaProducer<>(props);
}
/**
* 发送消息
* @param topicName 目标topic的名称
* @param key 消息的key
* @param msg 消息的内容
*/
public void sendMessage(String topicName,String key,String msg){
logger.info("kafka-producer:[topic="+topicName+",key="+key+",value="+msg+"]");
producer.send(new ProducerRecord<String, String>(topicName, key, msg));
}
这是两个主要的方法,剩下的自己相应得补齐即可。注意,我把消费者相关的代码删除了。
数据计算端
计算端是这次demo的重点,总体上来说,它的任务分为下面这么几点:
- 从kafka读取到数据
- 从对读取到的数据进行计算
- 将数据流写入到另一个kafka队列
- 将计算规则配置化,并可以动态变更
这些目标我可以分为三个步骤来实现:
- 实现flink计算任务对kafka的读取和写入
- 实现对流的计算
- 实现流计算的动态配置化
接下来我们一步一步得实现这个目标。
实现flink计算任务对kafka的读取和写入
这一步似乎是我最不敢跨出去的一步,我也不知道为什么。我感觉,完全不知道flink对这件事的逻辑,connector该怎么使用什么的。但是,当我听完kafka的课程,硬啃官方文档之后,还是进行了尝试。
选择flink kafka connector
由于对kafka进行了系统性的学习,发现kafka其实还很年轻。它成长很快,导致其早期版本依然有大量用户,但是其早期版本相较现在的版本变化很大。所以,在flink kafka connector中针对不同的kafka版本提出了不同的连接器。具体的信息可以查看官网: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html 这里我们选择的是:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.10.0</version>
</dependency>
从kafka读取数据
先上代码吧
//定义kafka服务相关属性,生产者和消费者均会使用
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.137.11:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "192.168.137.11:2181");
properties.setProperty("group.id", "TrasformToMonitorData");
//创建一个消费者,从kafka队列读取消息
FlinkKafkaConsumer consumer=new FlinkKafkaConsumer("data-upload-stream", new SimpleStringSchema(), properties);
//设置从队列第一条进行读取
consumer.setStartFromEarliest();
//将读取到的数据导入到一个流中
DataStream<String> stream = env.addSource(consumer);
这个定义的过程其实还是蛮简单的,构建一个properties,这应该就是kafka客户端本身需要的那个。然后,定义一个FlinkKafkaConsumer,给出主题名称,序列化方式,把properties丢进去。注意,为了方便调试demo,我这里设置成总是从第一条进行读取,然后就是将这个 consumer作为Source,导入到一个流中。就完成了全部操作。这个时候我们可以直接将stream中的内容打印出来,就可以看到了。
将数据塞入kafka主题
为了解决后顾之忧,我要先测试下如何向主题塞入数据。从结果上来说,蛮简单的,如下:
//创建一个kafka生产者
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(
"transformed-data-stream"
,new ProducerStringSerializationSchema("transformed-data-stream")
, properties
,FlinkKafkaProducer.Semantic.EXACTLY_ONCE
); // serialization schema
//将数据流导入到生产者中
stream.addSink(myProducer);
但是,这个过程其实蛮费劲的。因为官网上给出的例子并不是这样的,官网上是这么写的:
DataStream<String> stream = ...;
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
"localhost:9092", // broker list
"my-topic", // target topic
new SimpleStringSchema()); // serialization schema
// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true);
stream.addSink(myProducer);
我把011去掉后,发现,这个构造函数被抛弃了,如果刚学一个东西就把人家准备抛弃的东西奉为长期使用的东西,其实还是蛮揪心的。于是我看了下其构造函数的源码。发现,其实这个类并没有被抛弃,它给出了一组新的构造函数。没抛弃的构造函数的所有参数我都能搞定,除了KafkaSerializationSchema。我没有找到任何一个它的实现类,而我所要的不过是一个简单的String的实现。经过百度和谷歌两位大神的指点,我的结论是,只能自定义一个,而我刚好也在网上找到了一段可以直接用的源代码: https://stackoverflow.com/questions/58644549/how-to-implement-flinkkafkaproducer-serializer-for-kafka-2-2
public class ProducerStringSerializationSchema implements KafkaSerializationSchema<String> {
private String topic;
public ProducerStringSerializationSchema(String topic) {
super();
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8));
}
}
然后,我就成功了。
实现对流的计算
刚开始,对于流计算的原语不是很了解,觉得眼前一抹黑。后来,度娘后,啃了下分词的小例子,豁然开朗,我就知道我的计算该怎么写了。
我要用到的,就是基本的数据转换,大概分为如下:
- Map:接受一个元素,输出一个元素。MapFunction<T,V>中T代表输入数据类型(map方法的参数类型),V代表操作结果输出类型(map方法返回数据类型)。
- flatMap:输入一个元素,输出0个、1个或多个元素。FlatMapFunction<T,V>中T代表输入元素数据类型(flatMap方法的第一个参数类型),V代表输出集合中元素类型(flatMap中的Collector类型参数)
- filter:过滤指定元素数据,如果返回true则该元素继续向下传递,如果为false则将该元素过滤掉。FilterFunction<T>中T代表输入元素的数据类型。
- keyBy:逻辑上将数据流元素进行分区,具有相同key的记录将被划分到同一分区。指定Key的方式有多种,这个我们在之前说过了。返回类型KeyedStream<T,KEY>中T代表KeyedStream中元素数据类型,KEY代表虚拟KEY的数据类型。
- reduce:对指定的“虚拟”key相同的记录进行滚动合并,也就是当前元素与最后一次的reduce结果进行reduce操作。ReduceFunction<T>中的T代表KeyStream中元素的数据类型。
- Aggregations:滚动聚合具有相同key的数据流元素,我们可以指定需要聚合的字段(field)。DataStream<T>中的T为聚合之后的结果。
蛮多的,在官网没找到系统说这些的,可能是我英文差吧,找到了个说的蛮全的帖子,有空详细学习一下: https://www.jianshu.com/p/fc4fb559d9f4
总之,我实现的代码如下:
DataStream<String> sequenceDataStream=stream.flatMap(new FlatMapFunction<String,String>(){
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
if(ValidateUtil.strNotEmptyWithTrim(value)) {
UploadData uploadData = JsonUtil.json2Obj(value, UploadData.class);
if (uploadData != null && uploadData.getData()!=null) {
for(Integer dataIndex : uploadData.getData().keySet()){
UploadDataUnit unitData=uploadData.getData().get(dataIndex);
SequenceData sequenceData=new SequenceData();
sequenceData.setUploadDataId(uploadData.getUploadDataId());
sequenceData.setMonitorDataId(1L);//需要根据绑定关系获取该ID
sequenceData.setUploadtime(uploadData.getUploadtime());
sequenceData.setValueType("test_sequence");//需要根据MonitorDataId获取该type值
sequenceData.setValue(Integer.valueOf(String.valueOf(unitData.getValue())));//需要根据数据类型进行数据转换
out.collect(JsonUtil.obj2Json(sequenceData));
}
}
}
}
});
需要解释下,之前我data-upload-stream中读取的是UploadData结构的数据,它里面的data字段可能包含多个数据,每一个是一个transformed-data-stream中的数据,这里面就存在个一转多的问题。所以,我使用了一个flatMap转换,在里面详细定义了转换的规则。在后面,将我sequenceDataStream的内容定义到新的队列输出,就实现了转换。
实现计算配置动态化
这一点我之前认为,flink在计算过程中,由于其线程的生成和管理都是flink运行时管理的,所以就以为需要通过其状态管理机制对这种配置进行管理。后来就详细阅读了其对状态管理的体系设计,发现,它主要是针对在计算过程中,由于计算而产生的状态的管理。而像我这种纯粹是对计算规则的配置化只字未提。所以,我们就尝试下,让它在计算过程中直接从redis中读取数据。
下面是一些代码片段:
创建jedispool
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxWaitMillis(10000);
config.setMaxTotal(50);
config.setMaxIdle(50);
jedisPool = new JedisPool(config, "192.168.137.11", 6379);
使用jedis
Jedis jedis=jedisPool.getResource();
……
Integer test= Integer.valueOf(jedis.get("testValue"));
……
jedis.close();
这里单纯做测试之用,写的就比较捡漏。可喜的是,成功了。需要注意的是,jedisPool需要是可以全局获取的对象,而不可以是临时变量。
网友评论