美文网首页大数据
Kafka 中使用 Avro 序列化组件(三):Confluen

Kafka 中使用 Avro 序列化组件(三):Confluen

作者: CoderJed | 来源:发表于2018-06-25 13:19 被阅读3925次

    1. schema 注册表

    无论是使用传统的Avro API自定义序列化类和反序列化类还是使用Twitter的Bijection类库实现Avro的序列化与反序列化,这两种方法都有一个缺点:在每条Kafka记录里都嵌入了schema,这会让记录的大小成倍地增加。但是不管怎样,在读取记录时仍然需要用到整个 schema,所以要先找到 schema。有没有什么方法可以让数据共用一个schema?

    我们遵循通用的结构模式并使用"schema注册表"来达到目的。"schema注册表"的原理如下:

    把所有写入数据需要用到的 schema 保存在注册表里,然后在记录里引用 schema 的 ID。负责读取数据的应用程序使用 ID 从注册表里拉取 schema 来反序列化记录。序列化器和反序列化器分别负责处理 schema 的注册和拉取。

    schema注册表并不属于Kafka,现在已经有一些开源的schema 注册表实现。比如本文要讨论的Confluent Schema Registry。

    2. 案例说明

    现有 schema 文件 user.json,其中内容如下:

    {
        "type": "record",
        "name": "User",
        "fields": [
            {"name": "id", "type": "int"},
            {"name": "name",  "type": "string"},
            {"name": "age", "type": "int"}
        ]
    }
    

    需求:把这个 schema 中的内容注册到 Confluent Schema Registry 中,Kafka Producer 和 Kafka Consumer 通过识别 Confluent Schema Registry 中的 schema 内容来序列化和反序列化。

    3. 实操步骤

    (1) 启动 Confluent Schema Registry 服务

    • Confluent 下载地址:https://www.confluent.io/download/,我这里使用confluent-oss-4.1.1-2.11.tar.gz
    • 下载好后上传到服务器,解压即可用
    • 进入confluent-4.1.1/etc/schema-registry/目录下,修改schema-registry.properties文件,内容及注释如下:
    # Confluent Schema Registry 服务的访问IP和端口
    listeners=http://192.168.42.89:8081
    
    # Kafka集群所使用的zookeeper地址,如果不配置,会使用Confluent内置的Zookeeper地址(localhost:2181)
    kafkastore.connection.url=192.168.42.89:2181/kafka-1.1.0-cluster
    
    # Kafka集群的地址(上一个参数和这个参数配置一个就可以了)
    # kafkastore.bootstrap.servers=192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094
    
    # 存储 schema 的 topic
    kafkastore.topic=_schemas
    
    # 其余保持默认即可
    
    • 启动 Confluent Schema Registry
    [root@confluent confluent-4.1.1]# bin/schema-registry-start etc/schema-registry/schema-registry.properties
    # 省略一些内容......
    [2018-06-22 16:10:26,442] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:45)
    

    (2) 注册 User 的 schema 注册到对应的 topic 下

    • 首先把原来的 schema 文件加上 "schema" 标记
    {
        "schema": "{
            "type": "record",
            "name": "User",
            "fields": [
                {"name": "id", "type": "int"},
                {"name": "name",  "type": "string"},
                {"name": "age", "type": "int"}
            ]
        }"
    }
    
    • 部分"需要转义:
    {
        "schema": "{
            \"type\": \"record\",
            \"name\": \"User\",
            \"fields\": [
                {\"name\": \"id\", \"type\": \"int\"},
                {\"name\": \"name\",  \"type\": \"string\"},
                {\"name\": \"age\", \"type\": \"int\"}
            ]
        }"
    }
    
    • 注册 schema 的命令如下
    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '' \ 
    http://192.168.42.89:8081/subjects/dev3-yangyunhe-topic001-value/versions
    

    说明:
    <1> ''之间需要填写schema字符串
    <2> 我用来测试的 topic 为 dev3-yangyunhe-topic001,而且我只对 Kafka 的 value 进行 avro 的序列化,所以注册的地址为http://192.168.42.89:8081/subjects/dev3-yangyunhe-topic001-value/versions
    <3> http://192.168.42.89:8081需要根据自己的配置进行修改

    • 把转义后 schema 填充到 --data ''的两个单引号中
    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"id\", \"type\": \"int\"}, {\"name\": \"name\",  \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}"}' \
    http://192.168.42.89:8081/subjects/dev3-yangyunhe-topic001-value/versions
    
    • 注册成功会返回这个 schema 的 ID
    {"id":102}
    

    (3) 在 maven 工程中引入 Confluent Schema Registry 相关的 jar 包

    这些 jar 包在 maven 仓库中下载不到,需要自己手动添加到集群中,confluent-4.1.1 解压后,其 share/java/目录下有 confluent 各个组件的 jar 包:

    我们需要 confluent-common 目录下的common-config-4.1.1.jarcommon-utils-4.1.1.jar和全部以jackson开头的 jar 包以及 kafka-serde-tools 目录下的kafka-schema-registry-client-4.1.1.jarkafka-avro-serializer-4.1.1.jar,关于如何添加本地的 jar 包到 java 工程中,本文不再赘述。

    (4) Kafka Producer 发送数据

    package com.bonc.rdpe.kafka110.producer;
    
    import java.util.Properties;
    import java.util.Random;
    
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    /**
     * @Title ConfluentProducer.java 
     * @Description 使用Confluent实现的Schema Registry服务来发送Avro序列化后的对象
     * @Author YangYunhe
     * @Date 2018-06-25 10:49:19
     */
    public class ConfluentProducer {
        
        public static final String USER_SCHEMA = "{\"type\": \"record\", \"name\": \"User\", " + 
                "\"fields\": [{\"name\": \"id\", \"type\": \"int\"}, " + 
                "{\"name\": \"name\",  \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}";
        
        public static void main(String[] args) throws Exception {
            
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            // 使用Confluent实现的KafkaAvroSerializer
            props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
            // 添加schema服务的地址,用于获取schema
            props.put("schema.registry.url", "http://192.168.42.89:8081");
    
            Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
            
            Schema.Parser parser = new Schema.Parser();
            Schema schema = parser.parse(USER_SCHEMA);
            
            Random rand = new Random();
            int id = 0;
    
            while(id < 100) {
                id++;
                String name = "name" + id;
                int age = rand.nextInt(40) + 1;
                GenericRecord user = new GenericData.Record(schema);
                user.put("id", id);
                user.put("name", name);
                user.put("age", age);
                
                ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("dev3-yangyunhe-topic001", user);
                
                producer.send(record);
                Thread.sleep(1000);
            }
    
            producer.close();
    
        }
    
    }
    

    (5) Kafka Consumer 消费数据

    package com.bonc.rdpe.kafka110.consumer;
    
    import java.util.Collections;
    import java.util.Properties;
    
    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    /**
     * @Title ConfluentConsumer.java
     * @Description 使用Confluent实现的Schema Registry服务来消费Avro序列化后的对象
     * @Author YangYunhe
     * @Date 2018-06-25 11:42:21
     */
    public class ConfluentConsumer {
    
        public static void main(String[] args) throws Exception {
    
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
            props.put("group.id", "dev3-yangyunhe-group001");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // 使用Confluent实现的KafkaAvroDeserializer
            props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
            // 添加schema服务的地址,用于获取schema
            props.put("schema.registry.url", "http://192.168.42.89:8081");
            KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
    
            consumer.subscribe(Collections.singletonList("dev3-yangyunhe-topic001"));
    
            try {
                while (true) {
                    ConsumerRecords<String, GenericRecord> records = consumer.poll(1000);
                    for (ConsumerRecord<String, GenericRecord> record : records) {
                        GenericRecord user = record.value();
                        System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "
                                + user.get("name") + ", " + "user.age = " + user.get("age") + "], "
                                + "partition = " + record.partition() + ", " + "offset = " + record.offset());
                    }
                }
            } finally {
                consumer.close();
            }
        }
    }
    

    (6) 测试结果

    Kafka Consumer 的控制台输出内容如下:

    value = [user.id = 1, user.name = name1, user.age = 20], partition = 1, offset = 696
    value = [user.id = 2, user.name = name2, user.age = 27], partition = 0, offset = 696
    value = [user.id = 3, user.name = name3, user.age = 35], partition = 2, offset = 695
    value = [user.id = 4, user.name = name4, user.age = 7], partition = 1, offset = 697
    value = [user.id = 5, user.name = name5, user.age = 34], partition = 0, offset = 697
    
    ......
    

    相关文章

      网友评论

        本文标题:Kafka 中使用 Avro 序列化组件(三):Confluen

        本文链接:https://www.haomeiwen.com/subject/btnkyftx.html