美文网首页Apache KafkaConfluent 组件
Kafka Schema Registry 使用教程(JAVA)

Kafka Schema Registry 使用教程(JAVA)

作者: 扶我起来改bug | 来源:发表于2019-08-15 15:35 被阅读2次

Kafka Schema Registry 使用教程(JAVA)

如果没有 Schema Registry服务,自定义的schema需要在数据生产端和数据消费端都保存,有了 Schema Registry服务,数据生成方设置好Schema,会自动注册到 registry服务中,这样数据消费方就不需要保存schema,直接消费就可以。

实例以kafka 0.10.2.0 版本举例,kafka和Schema Registry的部署步骤在此省略。

  1. 相关jar包在 Maven仓库中下载不到,所以需要在maven的setting文件添加如下信息:

    <mirrors>
     <mirror>
             <id>confluent</id>
             <mirrorOf>confluent</mirrorOf>
             <name>Nexus public mirror</name>
             <url>http://packages.confluent.io/maven/</url>
     </mirror>
     ...
    </mirrors>
    
    <profiles>
     <profile>
         <repository>
             <id>confluent</id>
             <url>http://packages.confluent.io/maven/</url>
             <releases>
                 <enabled>true</enabled>
             </releases>
             <snapshots>
                 <enabled>true</enabled>
             </snapshots>
         </repository>
         ...
     </profile>
    </profiles>
    
  2. 在maven中引入依赖

    <dependency>
     <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.0</version>
    </dependency>
    
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
     <version>3.1.1</version>
    </dependency>
    
  3. 生成数据

    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData.Record;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaSchemaProducer {
    
        public static final String SCHEMA = "{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\", \"type\": \"string\"}, {\"name\":\"age\", \"type\": \"int\"}, {\"name\":\"sex\",\"type\": \"string\"}, {\"name\":\"comment\",\"type\": \"string\"}]}";
    
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
            properties.put("schema.registry.url", "http://localhost:8081");
    
            Producer<String, GenericRecord> producer = new KafkaProducer<>(properties);
    
            Schema.Parser parser = new Schema.Parser();
            Schema schema = parser.parse(SCHEMA);
    
            ProducerRecord producerRecord;
            for (int i = 0; i < 50; i++) {
                GenericRecord record = new Record(schema);
                record.put("id", i);
                record.put("name", "name" + i);
                record.put("age", i);
                record.put("sex", "male");
                record.put("comment", "comment" + i);
                producerRecord = new ProducerRecord("kyle-test-schema", record);
                producer.send(producerRecord);
            }
            producer.flush();
            System.out.println("complete");
            producer.close();
        }
    }
    
  4. 消费数据

    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    public class KafkaSchemaConsumer {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put("schema.registry.url", "http://localhost:8081");
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kyle-test");
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
    
            KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(properties);
            consumer.subscribe(Arrays.asList("kyle-test-schema"));
            ConsumerRecords<String, GenericRecord> records;
            try {
                while(true) {
                    records = consumer.poll(1000);
                    for (ConsumerRecord<String, GenericRecord> record : records) {
                        GenericRecord user = record.value();
                        System.out.println("id: " + user.get("id") + ", name: " + user.get("name") + ", age: " + user.get("age") + ", comment: " + user.get("comment") + ", sex: " + user.get("sex"));
                    }
                }
            } catch (Exception e){
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        }
    }
    
  5. 消费结果如下

    id: 6, name: name6, age: 6, comment: comment6, sex: male
    id: 16, name: name16, age: 16, comment: comment16, sex: male
    id: 26, name: name26, age: 26, comment: comment26, sex: male
    id: 36, name: name36, age: 36, comment: comment36, sex: male
    id: 46, name: name46, age: 46, comment: comment46, sex: male
    id: 4, name: name4, age: 4, comment: comment4, sex: male
    id: 14, name: name14, age: 14, comment: comment14, sex: male
    ...
    

可以添加作者微信进行相互学习交流,还请填写备注信息。

image

相关文章

网友评论

    本文标题:Kafka Schema Registry 使用教程(JAVA)

    本文链接:https://www.haomeiwen.com/subject/tmmzjctx.html