美文网首页
Kafka, avro序列化

Kafka, avro序列化

作者: halfempty | 来源:发表于2021-10-28 10:26 被阅读0次

借助confluentschema-registry服务, 实现生产者与消费者共享schema

要点

  • 使用confluent的序列化类io.confluent.kafka.serializers.KafkaAvroSerializer
  • 指定schema服务地址schema.registry.url

1. 准备

添加maven依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.10.2</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>5.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.2</version>
        </dependency>
    </dependencies>

注意
引用kafka-avro-serializer时, 仓库提示找不到类, 需要加载远程confluent仓库

   <repositories>
       <repository>
           <id>confluent</id>
           <url>http://packages.confluent.io/maven/</url>
       </repository>
   </repositories>

2. 生产者

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Random;

public class MyProducer {

    private final static String SCHEMA = "{\n" +
            "    \"type\": \"record\",\n" +
            "    \"name\": \"User\",\n" +
            "    \"fields\": [\n" +
            "        {\"name\": \"name\", \"type\": \"string\"},\n" +
            "        {\"name\": \"favorite_number\",  \"type\": [\"int\", \"null\"]},\n" +
            "        {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" +
            "    ]\n" +
            "}";

    private final static String[] COLORS = {"red", "blue", "yellow", "white"};

    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "172.31.138.83:6667");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        properties.put("schema.registry.url", "http://172.31.138.104:58088");

        Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties);

        Schema schema = new Schema.Parser().parse(SCHEMA);

        Random random = new Random();

        String topic = "unitopic";

        for (int i = 0; i < 10; i++) {
            GenericRecord user = new GenericData.Record(schema);
            user.put("name", "刘" + i);
            user.put("favorite_number", random.nextInt(100));
            user.put("favorite_color", COLORS[random.nextInt(COLORS.length)]);

            ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, user);
            producer.send(record);
        }
        producer.close();
    }
}

3. 消费者

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

public class MyConsumer {

    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "172.31.138.83:6667");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        properties.put("schema.registry.url", "http://172.31.138.104:58088");
        properties.put("group.id", "cg-101");

        String topic = "unitopic";

        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic));

        try {
            while (true) {
                ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
                for (ConsumerRecord<String, GenericRecord> record: records) {
                    GenericRecord user = record.value();
                    System.out.println(user);
                }
            }
        } finally {
            consumer.close();
        }
    }
}

相关文章

  • kafka avro序列化读写消息

    kafka avro序列化读写消息 avro是Hadoop的一个子项目,由Hadoop的创始人Doug Cutti...

  • kafka avro 序列化

    kafka avro 序列化有多种实现方式: 使用 io.confluent 需要设置 schema Regist...

  • Kafka, avro序列化

    借助confluent的schema-registry服务, 实现生产者与消费者共享schema 要点使用conf...

  • 记一次Avro序列化bug

    bug描述 Avro 序列化 Event长度超过63后 反序列化失败 问题定位 1.程序中将avro序列化后的by...

  • Avro JSON 序列化

    摘要 Schema究竟是什么,Avro和JSON的关系,Avro的序列化与反序列化,对象容器文件。 Schema究...

  • Avro

    Avro[http://avro.apache.org/]是一种与编程语言无关的序列化格式,Avro 数据通过与语...

  • Hadoop权威指南-ch4 Hadoop的I/O(3) Avr

    注:本文涉及书中4.4小结 数据序列化系统Avro Apache Avro是一个独立于编程语言的数据序列化系统,旨...

  • Kafka使用Avro序列化data

    如果您正开始使用Kafka,那么您需要做的一件事就是选择一种数据格式。最重要的是在使用过程中保持一致。任何格...

  • Kafka使用Apache Avro序列化

    1,什么是序列化和反序列化 当需要将数据存入文件或者通过网络发送出去时,需将数据对象转化为字节流,即对数据序列化,...

  • Avro 对象序列化与反序列化,及转Json对象序列化处理

    Avro 工具类 序列化与反序列化 public class AvroHelper { public byte[]...

网友评论

      本文标题:Kafka, avro序列化

      本文链接:https://www.haomeiwen.com/subject/esgmaltx.html