借助confluent
的schema-registry
服务, 实现生产者与消费者共享schema
要点
- 使用confluent的序列化类
io.confluent.kafka.serializers.KafkaAvroSerializer
- 指定schema服务地址
schema.registry.url
1. 准备
添加maven依赖
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.2</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.2</version>
</dependency>
</dependencies>
注意
引用kafka-avro-serializer
时, 仓库提示找不到类, 需要加载远程confluent仓库<repositories> <repository> <id>confluent</id> <url>http://packages.confluent.io/maven/</url> </repository> </repositories>
2. 生产者
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
public class MyProducer {
private final static String SCHEMA = "{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"User\",\n" +
" \"fields\": [\n" +
" {\"name\": \"name\", \"type\": \"string\"},\n" +
" {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n" +
" {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" +
" ]\n" +
"}";
private final static String[] COLORS = {"red", "blue", "yellow", "white"};
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "172.31.138.83:6667");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
properties.put("schema.registry.url", "http://172.31.138.104:58088");
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties);
Schema schema = new Schema.Parser().parse(SCHEMA);
Random random = new Random();
String topic = "unitopic";
for (int i = 0; i < 10; i++) {
GenericRecord user = new GenericData.Record(schema);
user.put("name", "刘" + i);
user.put("favorite_number", random.nextInt(100));
user.put("favorite_color", COLORS[random.nextInt(COLORS.length)]);
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, user);
producer.send(record);
}
producer.close();
}
}
3. 消费者
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class MyConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "172.31.138.83:6667");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
properties.put("schema.registry.url", "http://172.31.138.104:58088");
properties.put("group.id", "cg-101");
String topic = "unitopic";
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
for (ConsumerRecord<String, GenericRecord> record: records) {
GenericRecord user = record.value();
System.out.println(user);
}
}
} finally {
consumer.close();
}
}
}
网友评论