美文网首页
Kafka入门

Kafka入门

作者: 赵镇 | 来源:发表于2021-04-20 07:58 被阅读0次

Kafka生产者消费者,拦截器,分区器,序列化以及反序列化器

直接上代码
pom文件

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.2</version>
        </dependency>
                    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
    </dependencies>

发送消息的实体化数据

public class User {
    private Integer userId;
    private String username;

    public User() {
    }

    public User(Integer userId, String username) {
        this.userId = userId;
        this.username = username;
    }

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    @Override
    public String toString() {
        return "User{" +
                "userId=" + userId +
                ", username='" + username + '\'' +
                '}';
    }
}

生产者,其中包含了自定义序列化器和分区器以及生产者拦截器的一些配置,采用了异步发送的
模式

public class MyProducer {
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "115.159.150.169:9092");
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 设置自定义的序列化器
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);
        configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, Kafkapartitioner.class);
        configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInteceptor.class.getName());
        KafkaProducer<String, User> producer = new KafkaProducer<String, User>(configs);

        for (int i=1;i<5;i++){
            User user = new User();
            user.setUserId(400+i);
            user.setUsername("赵四"+i);

            ProducerRecord<String, User> record = new ProducerRecord<String, User>(
                    "tp_user_03",   // topic
                    user.getUsername(),   // key
                    user                  // value
            );


            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.out.println("消息发送异常");
                    } else {
//                        System.out.println("主题:" + metadata.topic() + "\t"
//                                + "分区:" + metadata.partition() + "\t"
//                                + "生产者偏移量:" + metadata.offset());
                        System.out.println(
                               "分区:" + metadata.partition());
                    }
                }
            });
        }


        // 关闭生产者
        producer.close();

    }
}

消费者,消费者中也加入反序列化和消费端的配置

public class UserConsumer {

    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "115.159.150.169:9092");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 设置自定义的反序列化器
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);

        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "user_consumer");
        configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_id");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, UserIdConsumerInteceptor.class.getName()
                +","+ UsernameConsumerInteceptor.class.getName());
        KafkaConsumer<String, User> consumer = new KafkaConsumer<String, User>(configs);

        // 订阅主题
        consumer.subscribe(Collections.singleton("tp_user_03"));

        final ConsumerRecords<String, User> records = consumer.poll(Long.MAX_VALUE);

        records.forEach(new Consumer<ConsumerRecord<String, User>>() {
            @Override
            public void accept(ConsumerRecord<String, User> record) {
                System.out.println(record.headers());
                System.out.println(record.value());
            }
        });

        // 关闭消费者
        consumer.close();

    }

}

接下来是序列化和反序列化的配置

public class UserSerializer implements Serializer<User> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // do nothing
        // 用于接收对序列化器的配置参数,并对当前序列化器进行配置和初始化的
    }

    @Override
    public byte[] serialize(String topic, User data) {
        System.out.println("username "+data.getUsername());
        //data.setUserId(401);
        data.setUsername(data.getUsername()+"在学习kafka");
        return JSON.toJSONBytes(data);
//        try {
//            if (data == null) {
//                return null;
//            } else {
//                final Integer userId = data.getUserId();
//                final String username = data.getUsername();
//
//                if (userId != null) {
//                    if (username != null) {
//                        final byte[] bytes = username.getBytes("UTF-8");
//                        int length = bytes.length;
//                        // 第一个4个字节用于存储userId的值
//                        // 第二个4个字节用于存储username字节数组的长度int值
//                        // 第三个长度,用于存放username序列化之后的字节数组
//                        ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
//                        // 设置userId
//                        buffer.putInt(userId);
//                        // 设置username字节数组长度
//                        buffer.putInt(length);
//                        // 设置username字节数组
//                        buffer.put(bytes);
//                        // 以字节数组形式返回user对象的值
//                        return buffer.array();
//                    }
//                }
//            }
//        } catch (Exception e) {
//            throw new SerializationException("数据序列化失败");
//        }
//        return null;
    }

    @Override
    public void close() {
        // do nothing
        // 用于关闭资源等操作。需要幂等,即多次调用,效果是一样的。
    }
}

可以看到,实际上在序列化的过程中,只要我们按照要求将数据转化为字节就可以了,同时我们可以在反序列化的过程中对数据进行修改或者处理

public class UserDeserializer implements Deserializer<User> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public User deserialize(String topic, byte[] data) {
//        ByteBuffer buffer = ByteBuffer.allocate(data.length);
//
//        buffer.put(data);
//        buffer.flip();
//
//        final int userId = buffer.getInt();
//        final int usernameLength = buffer.getInt();
//
//        String username = new String(data, 8, usernameLength);
//
//        return new User(userId, username);
        System.out.println("执行反序列化");
        return JSON.parseObject(data,User.class);
    }

    @Override
    public void close() {

    }
}

同样的在反序列化过程中需要对数据做对等的反序列化处理
生产者端拦截器,虽然我在代码中设置了重置分区,但是该设置并没有实现,可以确定的是拦截器是在分区器之前进行的操作

public class ProducerInteceptor implements ProducerInterceptor<String, User> {
    @Override
    public ProducerRecord<String, User> onSend(ProducerRecord<String, User> record) {
        System.out.println("拦截器1");
        Integer partition = record.partition();
        System.out.println("拦截器之前的partition是"+partition);
        String topic = record.topic();
        long timestamp = record.timestamp();
        String key = record.key();
        User user = record.value();
        Headers headers = record.headers();
        headers.add("patition inteceptor","partiton".getBytes());
        //拦截器partition重置所有的分区为2
        partition=2;
        ProducerRecord<String,User> producerRecord = new ProducerRecord<>(topic,partition,timestamp,key,user,headers);
        return producerRecord;
    }

    //发送成功回调的拦截器方法
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("回调成功拦截器");
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

消费者端拦截器

//第一个拦截器
public class UserIdConsumerInteceptor implements ConsumerInterceptor<String, User> {
    @Override
    public ConsumerRecords<String, User> onConsume(ConsumerRecords<String, User> records) {
        records.forEach(record->{
            record.value().setUserId(record.value().getUserId()+10);
        });
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        System.out.println("userId消费者拦截器提交偏移量了");
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

//第二个拦截器
public class UsernameConsumerInteceptor implements ConsumerInterceptor<String, User> {
    @Override
    public ConsumerRecords<String, User> onConsume(ConsumerRecords<String, User> records) {
        records.forEach(record->{
            record.value().setUsername(record.value().getUsername()+record.key());
        });
        return records;
    }

    //消费者提交偏移量时调用
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        System.out.println(offsets.entrySet());
        System.out.println("提交偏移量了");
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

分区器,我们实现的时候是以数据的ID为分区的要素

public class Kafkapartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
       User user = JSON.parseObject(valueBytes, User.class);
        System.out.println(user.getUserId());
       int partition = user.getUserId()%3;
        System.out.println("分区器分区时的分区key"+user.getUsername()+"   分区"+partition);
        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

最后实现的生产者端的结果,可以表明,执行顺序为。拦截器,序列化器,分区器


file

消费者端执行结果,可以看到反序列化器先执行,消费者拦截器后执行,消费者拦截器实际上是在回调的accept方法之后执行的,另外需要注意的一个参数是消费者拦截器和生产者拦截器都需要配置的是拦截器的全路径字符串的集合,且配置在前面的先执行


file

欢迎关注和点赞,以及总结的分类面试题https://github.com/zhendiao/JavaInterview

相关文章

  • Kafka视频集

    kafka企业级入门实战完整版 Kafka系列教程 Kafka入门 分布式消息通信Kafka原理剖析 阿里架构师直...

  • 再看kafka——spring boot集成kafka

    之前自己写过一篇入门文章kafka简单入门及与spring boot整合,主要是结合kafka官方的文档入门,学习...

  • Kafka学习

    MQ入门总结(六)Kafka的原理和使用 Kafka的架构原理,你真的理解吗? 真的,Kafka 入门一篇文章就够...

  • Kafka快速开始

    入门 1.简介 Kafka is a distributed streaming platform,kafka是一...

  • 【kafka】为什么要学习Kafka?

    KAFKA官方文档入门指南 http://ifeve.com/kafka-1 为什么要学习Kafka? http:...

  • (3)kafka的安装部署以及基本操作

    1.kafka 的安装部署 可以去看kafka的快速入门:http://kafka.apache.org/quic...

  • kafka入门

    Apache Kafka 入门 1.kafka简介和产生的背景 什么是 Kafka Kafka 是一款分布式消息发...

  • kafka极简入门(四)--常用配置

    回顾:kafka极简入门(三)--创建topic 前言 kafka针对broker, topic, produce...

  • kafka极简入门(二) --安装

    回顾kafka极简入门(一) --简介 1.单机版kafka安装 kafka需要结合zookeeper使用,所以本...

  • Kafka入门

    Kafka官网:http://kafka.apache.org/入门1.1 介绍Kafka™ 是一个分布式流处理系...

网友评论

      本文标题:Kafka入门

      本文链接:https://www.haomeiwen.com/subject/gkyglltx.html