Kafka Schema Registry 使用教程(JAVA)
如果没有 Schema Registry服务,自定义的schema需要在数据生产端和数据消费端都保存,有了 Schema Registry服务,数据生成方设置好Schema,会自动注册到 registry服务中,这样数据消费方就不需要保存schema,直接消费就可以。
实例以kafka 0.10.2.0 版本举例,kafka和Schema Registry的部署步骤在此省略。
-
相关jar包在 Maven仓库中下载不到,所以需要在maven的setting文件添加如下信息:
<mirrors> <mirror> <id>confluent</id> <mirrorOf>confluent</mirrorOf> <name>Nexus public mirror</name> <url>http://packages.confluent.io/maven/</url> </mirror> ... </mirrors> <profiles> <profile> <repository> <id>confluent</id> <url>http://packages.confluent.io/maven/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> ... </profile> </profiles>
-
在maven中引入依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>3.1.1</version> </dependency>
-
生成数据
import io.confluent.kafka.serializers.KafkaAvroSerializer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaSchemaProducer { public static final String SCHEMA = "{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\", \"type\": \"string\"}, {\"name\":\"age\", \"type\": \"int\"}, {\"name\":\"sex\",\"type\": \"string\"}, {\"name\":\"comment\",\"type\": \"string\"}]}"; public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); properties.put("schema.registry.url", "http://localhost:8081"); Producer<String, GenericRecord> producer = new KafkaProducer<>(properties); Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(SCHEMA); ProducerRecord producerRecord; for (int i = 0; i < 50; i++) { GenericRecord record = new Record(schema); record.put("id", i); record.put("name", "name" + i); record.put("age", i); record.put("sex", "male"); record.put("comment", "comment" + i); producerRecord = new ProducerRecord("kyle-test-schema", record); producer.send(producerRecord); } producer.flush(); System.out.println("complete"); producer.close(); } }
-
消费数据
import io.confluent.kafka.serializers.KafkaAvroDeserializer; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Properties; public class KafkaSchemaConsumer { public static void main(String[] args) { Properties properties = new Properties(); properties.put("schema.registry.url", "http://localhost:8081"); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kyle-test"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Arrays.asList("kyle-test-schema")); ConsumerRecords<String, GenericRecord> records; try { while(true) { records = consumer.poll(1000); for (ConsumerRecord<String, GenericRecord> record : records) { GenericRecord user = record.value(); System.out.println("id: " + user.get("id") + ", name: " + user.get("name") + ", age: " + user.get("age") + ", comment: " + user.get("comment") + ", sex: " + user.get("sex")); } } } catch (Exception e){ e.printStackTrace(); } finally { consumer.close(); } } }
-
消费结果如下
id: 6, name: name6, age: 6, comment: comment6, sex: male id: 16, name: name16, age: 16, comment: comment16, sex: male id: 26, name: name26, age: 26, comment: comment26, sex: male id: 36, name: name36, age: 36, comment: comment36, sex: male id: 46, name: name46, age: 46, comment: comment46, sex: male id: 4, name: name4, age: 4, comment: comment4, sex: male id: 14, name: name14, age: 14, comment: comment14, sex: male ...
可以添加作者微信进行相互学习交流,还请填写备注信息。
网友评论