美文网首页
flink实用demo——kafka数据转换

flink实用demo——kafka数据转换

作者: 本然酋长 | 来源:发表于2020-02-18 23:51 被阅读0次

前言

之前初步跑了flink官网的词频统计的demo。太初级了,不实用,只是带我进行了以便flink的基本操作。总计下来,我还需要试验这些操作:

  • 从kafka读取数据作为数据输入
  • 根据计算输出到另一个kafka主题中
  • 将后面kafka主题中的内容存储到数据库。
    根据这个目的,我设计了这次的demo

设计

计算场景设计

我的实际的主干场景是有一个上报数据的无限流,然后根据配置以及数据的内容,将数据转换为业务粒度的数据投入到后续的数据队列中。在这里,转换前和转换后的数据都是需要进行原样保存的。
这里有一些需要被测试实现的点:

  • 从kafka的指定topic中读取数据流,topic名称:data-upload-stream
  • 将数据进行计算后投入到新的流中,两个流的转换并不是一对一的。topic名称:transformed-data-stream
  • 根据可以动态变更的配置进行计算
  • 将数据存储到postgre中
  • 如果可以,使用现成的connector或者自定义connector

实现步骤

由于这里存在不少没用过的东西,无论技术是否测试成功,相应的问题还是要解决的。所以,我们从最小的陌生技术集合开始引入,我们分这么几个测试步骤。

数据拉取和投递

该过程,主要测试核心的从kafka拉取数据,并向kafka投递数据。

  1. 数据发送模拟由一个springmvc项目进行模拟,调用一次接口发送一次数据,以此可控得模拟数据发送
  2. 从kafka中拉取数据,进行转换后(这里转换规则先写为固定的)投递到另一个主题中

持久化数据

分别从刚才两个主题中拉取数据,并持久化到postgre数据库中。虽然之后最终的持久化容器计划是hbase,但是这里先对postgre进行测试,等本次例子全部完成后再规划对hbase的相关探索。

动态调整配置

这里数据转换的规则是根据配置进行改变的,故这里的配置需要可以动态改变的。根据了解,这里是根据其支持的Broadcast State功能进行支持的。

实现

数据源投递端

这里因为需要和计算端共享数据结构和逻辑,我这里设计了两个项目:

  • flink.common:公共项目,用来共享代码逻辑
  • flink-mockdata:数据模拟项目,仅实现/mock/push接口
    关于kafka的java操作这里就不细述了,因为最近系统性得学习了下kafka的使用关于kafka从调用到运维的细节还需要重新梳理一遍,这里就先完成功能了。
    另外,需要创建两个主题,kafka操作命令如下:
bin/kafka-topics.sh --zookeeper 192.168.137.11:2181 --create --topic data-upload-stream  --partitions 1 --replication-factor 1
bin/kafka-topics.sh --zookeeper 192.168.137.11:2181 --create --topic transformed-data-stream   --partitions 1 --replication-factor 1

需要说明的有这么两点:

  • 我使用的是kafka2.1版本,对于低于2.2版本的kafka在创建topic的时候需要指明的参数是zookeeper的地址,而大于等于2.2版本的则需要使用--bootstrap-server指明broker的地址
  • 在创建kafka的topic的时候如果使用下划线(_)或者点号(.)作连接符的话则会弹出一个警告信息,即这两个符号相冲突,不可以同时使用。所以这里用减号作连接符。

代码投递完,使用如下命令在服务端消费并查看相关的消息:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.137.11:9092 --topic data-upload-stream --from-beginning

发送者的主要代码我粘贴如下,仅供demo使用,详细参数设定尚未调试:

    /**
     * 初始化kafka模块,初始化之后消费者才会生效
     */
    public void init(){

        //创建生产者
        Properties props = new Properties();
        props.put("bootstrap.servers",abstractKafkaConfig.getBootstrapServers() );
        props.put("acks", "all");
        props.put("delivery.timeout.ms", abstractKafkaConfig.getDeleveryTimeoutMS());
        props.put("request.timeout.ms",abstractKafkaConfig.getRequestTimeoutMS());
        props.put("batch.size", 16384);
        props.put("linger.ms", abstractKafkaConfig.getLingerMS()); //发送的延迟时间
        props.put("buffer.memory", abstractKafkaConfig.getBufferMemory());
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        this.producer = new KafkaProducer<>(props);

    }

    /**
     * 发送消息
     * @param topicName 目标topic的名称
     * @param key 消息的key
     * @param msg 消息的内容
     */
    public void sendMessage(String topicName,String key,String msg){
        logger.info("kafka-producer:[topic="+topicName+",key="+key+",value="+msg+"]");
        producer.send(new ProducerRecord<String, String>(topicName, key, msg));
    }

这是两个主要的方法,剩下的自己相应得补齐即可。注意,我把消费者相关的代码删除了。

数据计算端

计算端是这次demo的重点,总体上来说,它的任务分为下面这么几点:

  1. 从kafka读取到数据
  2. 从对读取到的数据进行计算
  3. 将数据流写入到另一个kafka队列
  4. 将计算规则配置化,并可以动态变更

这些目标我可以分为三个步骤来实现:

  1. 实现flink计算任务对kafka的读取和写入
  2. 实现对流的计算
  3. 实现流计算的动态配置化

接下来我们一步一步得实现这个目标。

实现flink计算任务对kafka的读取和写入

这一步似乎是我最不敢跨出去的一步,我也不知道为什么。我感觉,完全不知道flink对这件事的逻辑,connector该怎么使用什么的。但是,当我听完kafka的课程,硬啃官方文档之后,还是进行了尝试。

选择flink kafka connector

由于对kafka进行了系统性的学习,发现kafka其实还很年轻。它成长很快,导致其早期版本依然有大量用户,但是其早期版本相较现在的版本变化很大。所以,在flink kafka connector中针对不同的kafka版本提出了不同的连接器。具体的信息可以查看官网: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html 这里我们选择的是:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.10.0</version>
</dependency>
从kafka读取数据

先上代码吧

//定义kafka服务相关属性,生产者和消费者均会使用
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.137.11:9092");
// only required for Kafka 0.8
        properties.setProperty("zookeeper.connect", "192.168.137.11:2181");
        properties.setProperty("group.id", "TrasformToMonitorData");

        //创建一个消费者,从kafka队列读取消息
        FlinkKafkaConsumer consumer=new FlinkKafkaConsumer("data-upload-stream", new SimpleStringSchema(), properties);
        //设置从队列第一条进行读取
        consumer.setStartFromEarliest();

        //将读取到的数据导入到一个流中
        DataStream<String> stream = env.addSource(consumer);

这个定义的过程其实还是蛮简单的,构建一个properties,这应该就是kafka客户端本身需要的那个。然后,定义一个FlinkKafkaConsumer,给出主题名称,序列化方式,把properties丢进去。注意,为了方便调试demo,我这里设置成总是从第一条进行读取,然后就是将这个 consumer作为Source,导入到一个流中。就完成了全部操作。这个时候我们可以直接将stream中的内容打印出来,就可以看到了。

将数据塞入kafka主题

为了解决后顾之忧,我要先测试下如何向主题塞入数据。从结果上来说,蛮简单的,如下:

//创建一个kafka生产者
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(
        "transformed-data-stream"
        ,new ProducerStringSerializationSchema("transformed-data-stream")
        , properties
        ,FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);   // serialization schema

//将数据流导入到生产者中
stream.addSink(myProducer);

但是,这个过程其实蛮费劲的。因为官网上给出的例子并不是这样的,官网上是这么写的:



DataStream<String> stream = ...;

FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
        "localhost:9092",            // broker list
        "my-topic",                  // target topic
        new SimpleStringSchema());   // serialization schema

// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true);

stream.addSink(myProducer);

我把011去掉后,发现,这个构造函数被抛弃了,如果刚学一个东西就把人家准备抛弃的东西奉为长期使用的东西,其实还是蛮揪心的。于是我看了下其构造函数的源码。发现,其实这个类并没有被抛弃,它给出了一组新的构造函数。没抛弃的构造函数的所有参数我都能搞定,除了KafkaSerializationSchema。我没有找到任何一个它的实现类,而我所要的不过是一个简单的String的实现。经过百度和谷歌两位大神的指点,我的结论是,只能自定义一个,而我刚好也在网上找到了一段可以直接用的源代码: https://stackoverflow.com/questions/58644549/how-to-implement-flinkkafkaproducer-serializer-for-kafka-2-2

public class ProducerStringSerializationSchema implements KafkaSerializationSchema<String> {

    private String topic;

    public ProducerStringSerializationSchema(String topic) {
        super();
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
        return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8));
    }

}

然后,我就成功了。

实现对流的计算

刚开始,对于流计算的原语不是很了解,觉得眼前一抹黑。后来,度娘后,啃了下分词的小例子,豁然开朗,我就知道我的计算该怎么写了。
我要用到的,就是基本的数据转换,大概分为如下:

  • Map:接受一个元素,输出一个元素。MapFunction<T,V>中T代表输入数据类型(map方法的参数类型),V代表操作结果输出类型(map方法返回数据类型)。
  • flatMap:输入一个元素,输出0个、1个或多个元素。FlatMapFunction<T,V>中T代表输入元素数据类型(flatMap方法的第一个参数类型),V代表输出集合中元素类型(flatMap中的Collector类型参数)
  • filter:过滤指定元素数据,如果返回true则该元素继续向下传递,如果为false则将该元素过滤掉。FilterFunction<T>中T代表输入元素的数据类型。
  • keyBy:逻辑上将数据流元素进行分区,具有相同key的记录将被划分到同一分区。指定Key的方式有多种,这个我们在之前说过了。返回类型KeyedStream<T,KEY>中T代表KeyedStream中元素数据类型,KEY代表虚拟KEY的数据类型。
  • reduce:对指定的“虚拟”key相同的记录进行滚动合并,也就是当前元素与最后一次的reduce结果进行reduce操作。ReduceFunction<T>中的T代表KeyStream中元素的数据类型。
  • Aggregations:滚动聚合具有相同key的数据流元素,我们可以指定需要聚合的字段(field)。DataStream<T>中的T为聚合之后的结果。

蛮多的,在官网没找到系统说这些的,可能是我英文差吧,找到了个说的蛮全的帖子,有空详细学习一下: https://www.jianshu.com/p/fc4fb559d9f4

总之,我实现的代码如下:

        DataStream<String> sequenceDataStream=stream.flatMap(new FlatMapFunction<String,String>(){
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                if(ValidateUtil.strNotEmptyWithTrim(value)) {
                    UploadData uploadData = JsonUtil.json2Obj(value, UploadData.class);
                    if (uploadData != null && uploadData.getData()!=null) {
                        for(Integer dataIndex : uploadData.getData().keySet()){
                            UploadDataUnit unitData=uploadData.getData().get(dataIndex);

                            SequenceData sequenceData=new SequenceData();
                            sequenceData.setUploadDataId(uploadData.getUploadDataId());
                            sequenceData.setMonitorDataId(1L);//需要根据绑定关系获取该ID
                            sequenceData.setUploadtime(uploadData.getUploadtime());
                            sequenceData.setValueType("test_sequence");//需要根据MonitorDataId获取该type值
                            sequenceData.setValue(Integer.valueOf(String.valueOf(unitData.getValue())));//需要根据数据类型进行数据转换

                            out.collect(JsonUtil.obj2Json(sequenceData));
                        }

                    }
                }
            }
        });

需要解释下,之前我data-upload-stream中读取的是UploadData结构的数据,它里面的data字段可能包含多个数据,每一个是一个transformed-data-stream中的数据,这里面就存在个一转多的问题。所以,我使用了一个flatMap转换,在里面详细定义了转换的规则。在后面,将我sequenceDataStream的内容定义到新的队列输出,就实现了转换。

实现计算配置动态化

这一点我之前认为,flink在计算过程中,由于其线程的生成和管理都是flink运行时管理的,所以就以为需要通过其状态管理机制对这种配置进行管理。后来就详细阅读了其对状态管理的体系设计,发现,它主要是针对在计算过程中,由于计算而产生的状态的管理。而像我这种纯粹是对计算规则的配置化只字未提。所以,我们就尝试下,让它在计算过程中直接从redis中读取数据。
下面是一些代码片段:

创建jedispool
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxWaitMillis(10000);
config.setMaxTotal(50);
config.setMaxIdle(50);
jedisPool = new JedisPool(config, "192.168.137.11", 6379);

使用jedis
Jedis jedis=jedisPool.getResource();
……
Integer test= Integer.valueOf(jedis.get("testValue"));
……
jedis.close();

这里单纯做测试之用,写的就比较捡漏。可喜的是,成功了。需要注意的是,jedisPool需要是可以全局获取的对象,而不可以是临时变量。

相关文章

网友评论

      本文标题:flink实用demo——kafka数据转换

      本文链接:https://www.haomeiwen.com/subject/devmfhtx.html