美文网首页
Flink 自定义Avro序列化(Source/Sink)到ka

Flink 自定义Avro序列化(Source/Sink)到ka

作者: 大数据老哥 | 来源:发表于2021-01-08 22:19 被阅读0次


    前言

    环境所依赖的pom文件

     <dependencies>
            <dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro</artifactId>
                <version>1.8.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.12</artifactId>
                <version>1.10.1</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.12</artifactId>
                <version>1.10.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
                <version>1.10.1</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-avro</artifactId>
                <version>1.10.1</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>1.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
                <version>1.0.0</version>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.avro</groupId>
                    <artifactId>avro-maven-plugin</artifactId>
                    <version>1.8.2</version>
                    <executions>
                        <execution>
                            <phase>generate-sources</phase>
                            <goals>
                                <goal>schema</goal>
                            </goals>
                            <configuration>
                                <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                                <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.6</source>
                        <target>1.6</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>

    一、Avro提供的技术支持包括以下五个方面:

    • 优秀的数据结构;
    • 一个紧凑的,快速的,二进制数据格式;
    • 一个容器文件,用来存储持久化数据;
    • RPC远程过程调用;
    • 集成最简单的动态语言。读取或者写入数据文件,使用或实现RPC协议均不需要代码实现。对于静态- - 语言编写的话需要实现;

    二、Avro优点

    • 二进制消息,性能好/效率高
    • 使用JSON描述模式
    • 模式和数据统一存储,消息自描述,不需要生成stub代码(支持生成IDL)
    • RPC调用在握手阶段交换模式定义
    • 包含完整的客户端/服务端堆栈,可快速实现RPC
    • 支持同步和异步通信
    • 支持动态消息
    • 模式定义允许定义数据的排序(序列化时会遵循这个顺序)
    • 提供了基于Jetty内核的服务基于Netty的服务

    三、Avro Json格式介绍

    {
        "namespace": "com.avro.bean",
        "type": "record",
        "name": "UserBehavior",
        "fields": [
            {"name": "userId", "type": "long"},
            {"name": "itemId",  "type": "long"},
            {"name": "categoryId", "type": "int"},
            {"name": "behavior", "type": "string"},
            {"name": "timestamp", "type": "long"}
        ]
    }
    • namespace : 要生成的目录
    • type :类型 avro 使用 record
    • name : 会自动生成对应的对象
    • fields : 要指定的字段

    注意: 创建的文件后缀名一定要叫 avsc

    四、使用Java自定义序列化到kafka

             首先我们先使用 Java编写Kafka客户端写入数据和消费数据。

    4.1 准备测试数据

    543462,1715,1464116,pv,1511658000662867,2244074,1575622,pv,1511658000561558,3611281,965809,pv,1511658000894923,3076029,1879194,pv,1511658000834377,4541270,3738615,pv,1511658000315321,942195,4339722,pv,1511658000625915,1162383,570735,pv,1511658000

    4.2 自定义Avro 序列化和反序列化

    首先我们需要实现2个类分别为SerializerDeserializer分别是序列化和反序列化

    package com.avro.AvroUtil;

    import com.avro.bean.UserBehavior;
    import org.apache.avro.io.BinaryDecoder;
    import org.apache.avro.io.BinaryEncoder;
    import org.apache.avro.io.DecoderFactory;
    import org.apache.avro.io.EncoderFactory;
    import org.apache.avro.specific.SpecificDatumReader;
    import org.apache.avro.specific.SpecificDatumWriter;
    import org.apache.kafka.common.serialization.Deserializer;
    import org.apache.kafka.common.serialization.Serializer;

    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.util.Map;

    /**
     * @author 大数据老哥
     * @version V1.0
     * @Package com.avro.AvroUtil
     * @File :SimpleAvroSchemaJava.java
     * @date 2021/1/8 20:02 */
    /**
     *  自定义序列化和反序列化 */
    public class SimpleAvroSchemaJava implements Serializer<UserBehavior>, Deserializer<UserBehavior> {
        
        @Override
        public void configure(Map<String, ?> map, boolean b) {

        }
        //序列化方法
        @Override
        public byte[] serialize(String s, UserBehavior userBehavior) {
            // 创建序列化执行器
            SpecificDatumWriter<UserBehavior> writer = new SpecificDatumWriter<UserBehavior>(userBehavior.getSchema());
             // 创建一个流 用存储序列化后的二进制文件
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            // 创建二进制编码器
            BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
            try {
                // 数据入都流中
                writer.write(userBehavior, encoder);
            } catch (IOException e) {
                e.printStackTrace();
            }

            return out.toByteArray();
        }

        @Override
        public void close() {

        }

        //反序列化
        @Override
        public UserBehavior deserialize(String s, byte[] bytes) {
            // 用来保存结果数据
            UserBehavior userBehavior = new UserBehavior();
            // 创建输入流用来读取二进制文件
            ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(bytes);
            // 创建输入序列化执行器
            SpecificDatumReader<UserBehavior> stockSpecificDatumReader = new SpecificDatumReader<UserBehavior>(userBehavior.getSchema());
            //创建二进制解码器
            BinaryDecoder binaryDecoder = DecoderFactory.get().directBinaryDecoder(arrayInputStream, null);
            try {
                // 数据读取
                userBehavior=stockSpecificDatumReader.read(null, binaryDecoder);
            } catch (IOException e) {
                e.printStackTrace();
            }
            // 结果返回
            return userBehavior;
        }
    }

    4.3 创建序列化对象

    package com.avro.kafka;
    import com.avro.bean.UserBehavior;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;

    /**
     * @author 大数据老哥
     * @version V1.0
     * @Package com.avro.kafka
     * @File :UserBehaviorProducerKafka.java
     * @date 2021/1/8 20:14 */

    public class UserBehaviorProducerKafka {
        public static void main(String[] args) throws InterruptedException {
            // 获取数据
            List<UserBehavior> data = getData();
            // 创建配置文件
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092");
            props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.setProperty("value.serializer", "com.avro.AvroUtil.SimpleAvroSchemaJava");
            // 创建kafka的生产者
            KafkaProducer<String, UserBehavior> userBehaviorProducer = new KafkaProducer<String, UserBehavior>(props);
            // 循环遍历数据
            for (UserBehavior userBehavior : data) {
                ProducerRecord<String, UserBehavior> producerRecord = new ProducerRecord<String, UserBehavior>("UserBehaviorKafka", userBehavior);
                userBehaviorProducer.send(producerRecord);
                System.out.println("数据写入成功"+data);
                Thread.sleep(1000);
            }
        }

        public static List<UserBehavior> getData() {
            ArrayList<UserBehavior> userBehaviors = new ArrayList<UserBehavior>();
            try {
                BufferedReader br = new BufferedReader(new FileReader(new File("data/UserBehavior.csv")));
                String line = "";
                while ((line = br.readLine()) != null) {
                    String[] split = line.split(",");
                 userBehaviors.add( new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4])));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return userBehaviors;
        }
    }

    注意:value.serializer 一定要指定我们自己写好的那个反序列化类,负责会无效

    4.4 创建反序列化对象

    package com.avro.kafka;
    import com.avro.bean.UserBehavior;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.util.Arrays;
    import java.util.Properties;

    /**
     * @author 大数据老哥
     * @version V1.0
     * @Package com.avro.kafka
     * @File :UserBehaviorConsumer.java
     * @date 2021/1/8 20:58 */
    public class UserBehaviorConsumer {

        public static void main(String[] args) {
            Properties prop = new Properties();
            prop.put("bootstrap.servers", "192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092");
            prop.put("group.id", "UserBehavior");
            prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // 设置反序列化类为自定义的avro反序列化类
            prop.put("value.deserializer", "com.avro.AvroUtil.SimpleAvroSchemaJava");
            KafkaConsumer<String, UserBehavior> consumer = new KafkaConsumer<String, UserBehavior>(prop);
            consumer.subscribe(Arrays.asList("UserBehaviorKafka"));
            while (true) {
                ConsumerRecords<String, UserBehavior> poll = consumer.poll(1000);
                for (ConsumerRecord<String, UserBehavior> stringStockConsumerRecord : poll) {
                    System.out.println(stringStockConsumerRecord.value());
                }
            }
        }
    }

    4.5 启动运行

    创建kafkaTopic 和启动一个消费者

    # 创建topic
    ./kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic UserBehaviorKafka
    # 模拟消费者
    ./kafka-console-consumer.sh --from-beginning --topic UserBehaviorKafka --zookeeper node01:2181,node02:2node03:2181

    五、Flink 实现Avro自定义序列化到Kafka

             到这里好多小伙们就说我Java实现了那Flink 不就改一下Consumer 和Producer 不就完了吗?

    5.1 准备数据

    543462,1715,1464116,pv,1511658000662867,2244074,1575622,pv,1511658000561558,3611281,965809,pv,1511658000894923,3076029,1879194,pv,1511658000834377,4541270,3738615,pv,1511658000315321,942195,4339722,pv,1511658000625915,1162383,570735,pv,1511658000

    5.2 创建Flink自定义Avro序列化和反序列化


    package com.avro.AvroUtil;

    import com.avro.bean.UserBehavior;
    import com.typesafe.sslconfig.ssl.FakeChainedKeyStore;
    import org.apache.avro.io.BinaryDecoder;
    import org.apache.avro.io.BinaryEncoder;
    import org.apache.avro.io.DecoderFactory;
    import org.apache.avro.io.EncoderFactory;
    import org.apache.avro.specific.SpecificDatumReader;
    import org.apache.avro.specific.SpecificDatumWriter;
    import org.apache.flink.api.common.serialization.DeserializationSchema;
    import org.apache.flink.api.common.serialization.SerializationSchema;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.kafka.common.serialization.Deserializer;
    import org.apache.kafka.common.serialization.Serializer;

    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.util.Map;

    /**
     * @author 大数据老哥
     * @version V1.0
     * @Package com.avro.AvroUtil
     * @File :SimpleAvroSchemaFlink.java
     * @date 2021/1/8 20:02 */

    /**
     *  自定义序列化和反序列化 */
    public class SimpleAvroSchemaFlink implements DeserializationSchema<UserBehavior>, SerializationSchema<UserBehavior> {

     
        @Override
        public byte[] serialize(UserBehavior userBehavior) {
            // 创建序列化执行器
            SpecificDatumWriter<UserBehavior> writer = new SpecificDatumWriter<UserBehavior>(userBehavior.getSchema());
            // 创建一个流 用存储序列化后的二进制文件
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            // 创建二进制编码器
            BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
            try {
                // 数据入都流中
                writer.write(userBehavior, encoder);
            } catch (IOException e) {
                e.printStackTrace();
            }

            return out.toByteArray();
        }

        @Override
        public TypeInformation<UserBehavior> getProducedType() {
          return TypeInformation.of(UserBehavior.class);
        }

        @Override
        public UserBehavior deserialize(byte[] bytes) throws IOException {
            // 用来保存结果数据
            UserBehavior userBehavior = new UserBehavior();
            // 创建输入流用来读取二进制文件
            ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(bytes);
            // 创建输入序列化执行器
            SpecificDatumReader<UserBehavior> stockSpecificDatumReader = new SpecificDatumReader<UserBehavior>(userBehavior.getSchema());
            //创建二进制解码器
            BinaryDecoder binaryDecoder = DecoderFactory.get().directBinaryDecoder(arrayInputStream, null);
            try {
                // 数据读取
                userBehavior=stockSpecificDatumReader.read(null, binaryDecoder);
            } catch (IOException e) {
                e.printStackTrace();
            }
            // 结果返回
            return userBehavior;
        }

        @Override
        public boolean isEndOfStream(UserBehavior userBehavior) {
            return false;
        }
    }

    5.3 创建Flink Comsumer 反序列化

    package com.avro.FlinkKafka

    import com.avro.AvroUtil.{SimpleAvroSchemaFlink}
    import com.avro.bean.UserBehavior
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

    import java.util.Properties

    /**
     * @Package com.avro.FlinkKafka
     * @File :UserBehaviorConsumerFlink.java
     * @author 大数据老哥
     * @date 2021/1/8 21:18
     * @version V1.0 */
    object UserBehaviorConsumerFlink {
      def main(args: Array[String]): Unit = {
        //1.构建流处理运行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1) // 设置并行度1 方便后面测试
        // 2.设置kafka 配置信息
        val prop = new Properties
        prop.put("bootstrap.servers", "192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092")
        prop.put("group.id", "UserBehavior")
        prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        // 设置反序列化类为自定义的avro反序列化类
        prop.put("value.deserializer", "com.avro.AvroUtil.SimpleAvroSchemaFlink")

        //    val kafka: FlinkKafkaConsumer011[String] =  new FlinkKafkaConsumer011[String]("UserBehaviorKafka", new SimpleStringSchema(), prop)
        // 3.构建Kafka 连接器
        val kafka: FlinkKafkaConsumer011[UserBehavior] = new FlinkKafkaConsumer011[UserBehavior]("UserBehavior", new SimpleAvroSchemaFlink(), prop)

        //4.设置Flink层最新的数据开始消费
        kafka.setStartFromLatest()
        //5.基于kafka构建数据源
        val data: DataStream[UserBehavior] = env.addSource(kafka)
        //6.结果打印
        data.print()
        env.execute("UserBehaviorConsumerFlink")
      }
    }

    5.4 创建Flink Producer 序列化

    package com.avro.FlinkKafka

    import com.avro.AvroUtil.SimpleAvroSchemaFlink
    import com.avro.bean.UserBehavior
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011

    import java.util.Properties

    /**
     * @Package com.avro.FlinkKafka
     * @File :UserBehaviorProducerFlink.java
     * @author 大数据老哥
     * @date 2021/1/8 21:38
     * @version V1.0 */
    object UserBehaviorProducerFlink {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val value = env.readTextFile("./data/UserBehavior.csv")
        val users: DataStream[UserBehavior] = value.map(row => {
          val arr = row.split(",")
          val behavior = new UserBehavior()
          behavior.setUserId(arr(0).toLong)
          behavior.setItemId(arr(1).toLong)
          behavior.setCategoryId(arr(2).toInt)
          behavior.setBehavior(arr(3))
          behavior.setTimestamp(arr(4).toLong)
          behavior
        })
        val prop = new Properties()
        prop.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
        //4.连接Kafka
        val producer: FlinkKafkaProducer011[UserBehavior] = new FlinkKafkaProducer011[UserBehavior]("UserBehaviorKafka", new SimpleAvroSchemaFlink(), prop)
        //5.将数据打入kafka
        users.addSink(producer)
        //6.执行任务
        env.execute("UserBehaviorProducerFlink")
      }
    }

    5.5 启动运行

    需要源码的请去GitHub 自行下载  https://github.com/lhh2002/Flink_Avro

    小结

              其实我在实现这个功能的时候也是蒙的,不会难道就不学了吗,肯定不是呀。我在5.2提出的那个问题的时候其实是我自己亲身经历过的。首先遇到了问题不要想着怎么放弃,而是想想怎么解决,当时我的思路看源码看别人写的。最后经过不懈的努力也终成功了,我在这里为大家提供Flink面试题需要的朋友可以去下面GitHub去下载,信自己,努力和汗水总会能得到回报的。我是大数据老哥,我们下期见~~~

    资源获取 获取Flink面试题,Spark面试题,程序员必备软件,hive面试题,Hadoop面试题,Docker面试题,简历模板等资源请去

    GitHub自行下载 https://github.com/lhh2002/Framework-Of-BigData

    Gitee 自行下载 https://gitee.com/li_hey_hey/Framework-Of-BigData

    相关文章

      网友评论

          本文标题:Flink 自定义Avro序列化(Source/Sink)到ka

          本文链接:https://www.haomeiwen.com/subject/talqaktx.html