Kafka生产者消费者,拦截器,分区器,序列化以及反序列化器
直接上代码
pom文件
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>
发送消息的实体化数据
public class User {
private Integer userId;
private String username;
public User() {
}
public User(Integer userId, String username) {
this.userId = userId;
this.username = username;
}
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
@Override
public String toString() {
return "User{" +
"userId=" + userId +
", username='" + username + '\'' +
'}';
}
}
生产者,其中包含了自定义序列化器和分区器以及生产者拦截器的一些配置,采用了异步发送的
模式
public class MyProducer {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "115.159.150.169:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 设置自定义的序列化器
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, Kafkapartitioner.class);
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInteceptor.class.getName());
KafkaProducer<String, User> producer = new KafkaProducer<String, User>(configs);
for (int i=1;i<5;i++){
User user = new User();
user.setUserId(400+i);
user.setUsername("赵四"+i);
ProducerRecord<String, User> record = new ProducerRecord<String, User>(
"tp_user_03", // topic
user.getUsername(), // key
user // value
);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("消息发送异常");
} else {
// System.out.println("主题:" + metadata.topic() + "\t"
// + "分区:" + metadata.partition() + "\t"
// + "生产者偏移量:" + metadata.offset());
System.out.println(
"分区:" + metadata.partition());
}
}
});
}
// 关闭生产者
producer.close();
}
}
消费者,消费者中也加入反序列化和消费端的配置
public class UserConsumer {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "115.159.150.169:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 设置自定义的反序列化器
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "user_consumer");
configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_id");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, UserIdConsumerInteceptor.class.getName()
+","+ UsernameConsumerInteceptor.class.getName());
KafkaConsumer<String, User> consumer = new KafkaConsumer<String, User>(configs);
// 订阅主题
consumer.subscribe(Collections.singleton("tp_user_03"));
final ConsumerRecords<String, User> records = consumer.poll(Long.MAX_VALUE);
records.forEach(new Consumer<ConsumerRecord<String, User>>() {
@Override
public void accept(ConsumerRecord<String, User> record) {
System.out.println(record.headers());
System.out.println(record.value());
}
});
// 关闭消费者
consumer.close();
}
}
接下来是序列化和反序列化的配置
public class UserSerializer implements Serializer<User> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// do nothing
// 用于接收对序列化器的配置参数,并对当前序列化器进行配置和初始化的
}
@Override
public byte[] serialize(String topic, User data) {
System.out.println("username "+data.getUsername());
//data.setUserId(401);
data.setUsername(data.getUsername()+"在学习kafka");
return JSON.toJSONBytes(data);
// try {
// if (data == null) {
// return null;
// } else {
// final Integer userId = data.getUserId();
// final String username = data.getUsername();
//
// if (userId != null) {
// if (username != null) {
// final byte[] bytes = username.getBytes("UTF-8");
// int length = bytes.length;
// // 第一个4个字节用于存储userId的值
// // 第二个4个字节用于存储username字节数组的长度int值
// // 第三个长度,用于存放username序列化之后的字节数组
// ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
// // 设置userId
// buffer.putInt(userId);
// // 设置username字节数组长度
// buffer.putInt(length);
// // 设置username字节数组
// buffer.put(bytes);
// // 以字节数组形式返回user对象的值
// return buffer.array();
// }
// }
// }
// } catch (Exception e) {
// throw new SerializationException("数据序列化失败");
// }
// return null;
}
@Override
public void close() {
// do nothing
// 用于关闭资源等操作。需要幂等,即多次调用,效果是一样的。
}
}
可以看到,实际上在序列化的过程中,只要我们按照要求将数据转化为字节就可以了,同时我们可以在反序列化的过程中对数据进行修改或者处理
public class UserDeserializer implements Deserializer<User> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public User deserialize(String topic, byte[] data) {
// ByteBuffer buffer = ByteBuffer.allocate(data.length);
//
// buffer.put(data);
// buffer.flip();
//
// final int userId = buffer.getInt();
// final int usernameLength = buffer.getInt();
//
// String username = new String(data, 8, usernameLength);
//
// return new User(userId, username);
System.out.println("执行反序列化");
return JSON.parseObject(data,User.class);
}
@Override
public void close() {
}
}
同样的在反序列化过程中需要对数据做对等的反序列化处理
生产者端拦截器,虽然我在代码中设置了重置分区,但是该设置并没有实现,可以确定的是拦截器是在分区器之前进行的操作
public class ProducerInteceptor implements ProducerInterceptor<String, User> {
@Override
public ProducerRecord<String, User> onSend(ProducerRecord<String, User> record) {
System.out.println("拦截器1");
Integer partition = record.partition();
System.out.println("拦截器之前的partition是"+partition);
String topic = record.topic();
long timestamp = record.timestamp();
String key = record.key();
User user = record.value();
Headers headers = record.headers();
headers.add("patition inteceptor","partiton".getBytes());
//拦截器partition重置所有的分区为2
partition=2;
ProducerRecord<String,User> producerRecord = new ProducerRecord<>(topic,partition,timestamp,key,user,headers);
return producerRecord;
}
//发送成功回调的拦截器方法
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("回调成功拦截器");
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
消费者端拦截器
//第一个拦截器
public class UserIdConsumerInteceptor implements ConsumerInterceptor<String, User> {
@Override
public ConsumerRecords<String, User> onConsume(ConsumerRecords<String, User> records) {
records.forEach(record->{
record.value().setUserId(record.value().getUserId()+10);
});
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
System.out.println("userId消费者拦截器提交偏移量了");
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
//第二个拦截器
public class UsernameConsumerInteceptor implements ConsumerInterceptor<String, User> {
@Override
public ConsumerRecords<String, User> onConsume(ConsumerRecords<String, User> records) {
records.forEach(record->{
record.value().setUsername(record.value().getUsername()+record.key());
});
return records;
}
//消费者提交偏移量时调用
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
System.out.println(offsets.entrySet());
System.out.println("提交偏移量了");
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
分区器,我们实现的时候是以数据的ID为分区的要素
public class Kafkapartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
User user = JSON.parseObject(valueBytes, User.class);
System.out.println(user.getUserId());
int partition = user.getUserId()%3;
System.out.println("分区器分区时的分区key"+user.getUsername()+" 分区"+partition);
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
最后实现的生产者端的结果,可以表明,执行顺序为。拦截器,序列化器,分区器
file
消费者端执行结果,可以看到反序列化器先执行,消费者拦截器后执行,消费者拦截器实际上是在回调的accept方法之后执行的,另外需要注意的一个参数是消费者拦截器和生产者拦截器都需要配置的是拦截器的全路径字符串的集合,且配置在前面的先执行
file
欢迎关注和点赞,以及总结的分类面试题https://github.com/zhendiao/JavaInterview
网友评论