美文网首页
2.一文搞定Kafka的生产者

2.一文搞定Kafka的生产者

作者: 迷茫的小黑狗 | 来源:发表于2022-06-24 16:34 被阅读0次

    1.介绍

    这篇文章就是为了帮助大家来理解Produce存在的意义以及工作的方式来进行编写的,整篇文章也会从整体组成、运行时架构两个方面去进行讲解,希望大家能够看懂,如果有什么错误的描述,希望大家可以留言告诉我,让我能够及时的做出更改。

    2.整体组成

    kafka的生产者主要就是负责给kafka中发送数据的,其核心的本质工作内容就是快速且不丢失完成自己的工作内容。我还是先画一个组成架构图,方便大家理解。

    kafka.jpg

    这个图差不多就是这个样子的,也是从根本上描述了生产者到kafka之间的工作结构,并且通过箭头还能够读出工作流程。接下来我们就先说结构,再说流程。

    2.1 结构

    从结构上看呢,kafka的生产者主要有两个线程来完成工作,一个是main线程,另外一个是sender线程。main线程呢,主要负责数据的读取、拦截、序列化和发送,并且在启动的时候,会创建一个双端队列RecordAccmulator,所有的数据都是发送到这个双端队列中来的,这个双端队列的内存默认是32M,它的内部是一个一个的Dquene,而每个Dquene中都有很多个块,也就是ProducerBatch。这个块默认是16K。当它符合某种规则的时候,数据就会从Dquene中进入到下一个阶段,这个规则有两种,一种是当batch的大小到达触发发送机制的大小的时候数据被发送,另外一种就是设置的等待时间到了,数据就会被发送。上述内容记本上就是main线程要做的事情了。

    至于sender线程,它会不断的从双端队列中拉取数据发送给KafkaBroker,但是单纯的拉取数据肯定是不够严谨的。当sender线程开始工作的时候,它主要要做两件事情,第一件事情就是创建networkClient用于存储对kafka发送数据的请求,这个请求默认是5个。第二件事是创建一个selector用于发送数据,并且等待kafka的回应,如果kafka回应了,证明数据写入完成,那这个时候就会清理掉请求列表中的请求,并且通知双端队列清除已经写完数据的对应的Dquene中的batch。如果发送数据失败了,那selector就i会触发重试,这个重试的次数就是int类型的最大值。

    2.2 工作流程

    降完了组成就要将工作流程了,工作流程就干了一件事儿,那就是将生产者的数据发送给Kafka。

    1.kafkaProducer创建main线程和sender线程用于数据的发送工作。

    2.main线程创建对应的拦截器、序列化器、分区器,并且会创建一个能够缓存发送数据的双端队列。

    3.然后分区器向双端队列中写入数据,在双端队列的内部由一个一个的Dquene消息队列来存储这些数据。

    4.sender线程会周期性的监控这个双端队列,一旦达到触发拉取的时间或者大小就会向kafka的producer发送请求,请求发送数据。

    5.这些请求内容会被存储在networkCLient中。

    6.一旦kafka能够接收数据,就由sender线程的selector来负责数据的发送工作。

    7.当kafka接收完成数据之后,会发送ack验证码给selector,seletor会根据是否接收到ack码来确定数据是否发送成功。

    8.数据发送成功之后,selector会删除network中的请求,通知main线程删除双端队列中的Dquene中的batch中的数据;如果失败,那就重试,默认可以重试Int类型的最大值次。

    3.数据发送方式

    producer发送数据的方式有两种,分别是异步发送和同步发送。并且还要区分是否需要回调信息,那接下来,就让我一个一个的分别介绍给大家。

    3.1 不带回调信息的异步发送

    大家注意一下哈,这个异不异步不是针对sender到kafka的异步,而是外部数据到双端队列的异步。因为生产者发送数据实际上就是往双端队列中写入数据,然后sender从双端队列中拿取数据给kafka的过程,因为这个异步就是围绕着双端队列进行划分的。在这一阶段,异步就是指外部数据一直往双端队列中写入数据,并不会在乎Dquene中的数据是否成功写入到kafka中。

    如果用代码实现的话,大家可以看一下:

    <pre data-language="java" id="LvH9u" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">//1.引入依赖
    <dependency>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                    <version>3.0.0</version>
                </dependency>
    //2.编写程序
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    public class KafkaProducerStudy {
        public static void main(String[] args) {
            //todo 0.创建配置类
            Properties properties = new Properties();
            //kafka的地址
            properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"had1:9092");
            //kafka序列化
            properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringSerializer");
            //todo 创建kafka生产者对象
            KafkaProducer<String, String> kafkaProducer = new
                    KafkaProducer<String, String>(properties);
    
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(
                        new ProducerRecord<>("testTopic","number"+i)
                );
            }
    
            //todo 关闭资源
            kafkaProducer.close();
        }
    }
    </pre>
    

    是不是看起来没什么感觉?哈哈,我也觉着没什么感觉,但是带回调信息的异步发送就有一点点不一样了,不过因为需要等待回调信息的返回,所以速度会慢一些。

    3.2 异步发送带回调信息

    这个回调函数会在producer的selector接收到ack机制的时候被调用,调用方式也是异步的。如果返回的信息是异常的话,就代表着数据发送失败了。如果失败了,程序会自动重试。

    具体的代码如下:

    <pre data-language="java" id="XeBCc" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProducerStudyWithSentTwo {
        public static void main(String[] args) {
            //todo 1.配置类
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getClass());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
    
            KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    
            for (int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<>("testTopic","number:"+i)
                        , new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (e == null){
                            System.out.println("主题:"+recordMetadata.topic()+"->"+
                                    "分区:"+recordMetadata.partition());
                        }else {
                            e.printStackTrace();
                        }
                    }
                });
            }
    
            //todo 关闭生产者
            producer.close();
        }
    }</pre>
    

    3.3 同步发送API

    同步发送就需要双端队列等到kafka返回数据保存成功的消息之后,外部数据才能够把数据发送到双端队列中。而且同步发送在代码上就多了一个get

    <pre data-language="java" id="JkMJa" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    public class KafkaProducerStudyWithSentThr {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //todo 添加配置文件信息
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            //todo 创建kafka生产者
            KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    
            for (int i = 0; i < 5; i++) {
                producer.send(new ProducerRecord<>("testTopic","number:"+i)).get();
            }
            //todo 关闭资源
            producer.close();
        }
    }</pre>
    

    4.生产者的分区

    生产者的分区主要是针对kafka的topic来实现的,如果一份数据只固定的发送给一个topic的一个partition的话,那效率是很低下的。所以生产者发送数据的时候,是针对topic中所有的leader分区来进行拆解发送的,这样不仅会让发送的数据能够分散在多个broker上,实现负责均衡。还能够提高并行度,毕竟是向多个partition同时发送数据,肯定要比向一个partition发送来的快得多。

    不过既然要通过使用分区的方式完成数据的发送,那么就需要有对应的发送策略还维持生产者发送数据时的平衡。Kafka生产者的默认分区策略是:

    1.指明分区的时候,直接向这个分区写入数据,所有的数据都如此写入。

    2.当没有指明分区的时候,如果有key值,可以对key值取Hash,然后对分区数取模,这样所有的数据就能够进入到不同的分区中去了。

    3.如果既没有key也没有partition,那么kafka就会采用黏粘性分区器(Sticky),随机选择一个分区,然后一直向这个分区中写入数据,如果双端队列中的Dquene满了,就换一个其他的分区继续写,以此类推。

    下面用几段代码来描述一下不同的场景:

    <pre data-language="java" id="vxmOq" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">//指定topic
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProducerStudyWithSentFor {
        public static void main(String[] args) {
            //todo 添加配置文件信息
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            //todo 创建kafka生产者
            KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    
            for (int i = 0 ; i< 5 ; i++){
                //仅指定partition
                producer.send(new ProducerRecord<>("testTopic", 0, "", "number:" + i)
                        , new Callback() {
                            @Override
                            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                                //todo 观察是否进入到了对应的分区
                                if (e == null){
                                    System.out.println("数据进入到了分区:"+recordMetadata.topic());
                                }else {
                                    e.printStackTrace();
                                }
                            }
                        });
            }
            //todo 关闭ziyuan
            producer.close();
        }
    }</pre>
    
    <pre data-language="java" id="jZDNu" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">//todo 没指明partition的,但是指明了key
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProducerStudyWithSentFive {
        public static void main(String[] args) {
            //todo 添加配置文件信息
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            //todo 创建kafka生产者
            KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    
            for (int i = 0 ; i< 5 ; i++){
                //仅指定key,观察数据是否都进入到了一个分区内部
                producer.send(new ProducerRecord<>("testTopic", "a", "number:" + i)
                        , new Callback() {
                            @Override
                            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                                //todo 观察是否进入到了对应的分区
                                if (e == null){
                                    System.out.println("数据进入到了分区:"+recordMetadata.topic());
                                }else {
                                    e.printStackTrace();
                                }
                            }
                        });
            }
            //todo 关闭ziyuan
            producer.close();
        }
    }
    </pre>
    

    除此之外,kafka生产者还能够自己定义分区器,通过如下方法,就能够让包含博主昵称的数据进入到0号分区了。

    <pre data-language="java" id="OauTG" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">//todo 实现partitioner接口,重写patition方法
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    
    import java.util.Map;
    
    public class MyPartitioner implements Partitioner {
        @Override
        public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
            String msgValue = s.toString();
    
            int partition;
    
            if (msgValue.contains("迷茫的小黑狗")){
                partition = 0;
            }else {
                partition = 1;
            }
            return partition;
        }
        //关闭资源
        @Override
        public void close() {
    
        }
        //配置方法
        @Override
        public void configure(Map<String, ?> map) {
    
        }
    }
    </pre>
    

    那既然创建了自定义的分区器,就要应用到程序中,具体的应用方法如下段代码所示:

    <pre data-language="java" id="aClnC" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">public class KafkaProducerStudyWithSentSix {
        public static void main(String[] args) {
            //todo 添加配置文件信息
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
                    MyPartitioner.class.getName());
            //todo 创建kafka生产者
            KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    
            for (int i = 0 ; i< 5 ; i++){
                if (i % 2 ==0){
                //不包含博主昵称的
                producer.send(new ProducerRecord<>("testTopic", "number:" + i)
                        , new Callback() {
                            @Override
                            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                                //todo 观察是否进入到了对应的分区
                                if (e == null){
                                    System.out.println("数据进入到了分区:"+recordMetadata.topic());
                                }else {
                                    e.printStackTrace();
                                }
                            }
                        });
                }else {
                    producer.send(new ProducerRecord<>("testTopic", "迷茫的小黑狗" + i)
                            , new Callback() {
                                @Override
                                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                                    //todo 观察是否进入到了对应的分区
                                    if (e == null){
                                        System.out.println("数据进入到了分区:"+recordMetadata.topic());
                                    }else {
                                        e.printStackTrace();
                                    }
                                }
                            });
                }
            }
            //todo 关闭资源
            producer.close();
        }
    }</pre>
    

    4.1 如何提高生产者的吞吐量

    提升吞吐量的目的就是为了能够加快数据的发送,如果我们把数据单次发送的能力加强了,是不是就代表着生产者的吞吐量加强了呢?通过这个思路,我们可以选择增大双端队列的容积、延长Dquene的等待时间、加大Dquene中batch的块容积,通过这几种方法,就能够强化吞吐量啦。

    <pre data-language="java" id="rvQzk" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProducerStudyWithSentServen {
        public static void main(String[] args) {
            //todo 添加配置文件信息
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
                    MyPartitioner.class.getName());
            //将默认的16k增大到32K
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384 * 2 );
            //增大等待时间,从默认的0(不生效)改为5-100ms
            properties.put(ProducerConfig.LINGER_MS_CONFIG,5);
            //增大双端队列的缓冲区大小,由默认的32变成64
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432 * 2);
            //开启压缩,增大单批次数据传输的数量
            properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
    
            //todo 创建kafka生产者
            KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    
            producer.send(new ProducerRecord<>("testTopic","迷茫的小黑狗"));
            //todo 关闭资源
            producer.close();
        }
    }
    </pre>
    

    4.2 数据的可靠性

    既然提交了吞吐量,就代表着数据发送的能力变强了。不过能力变强了,准度可不能下降,所以还要研究一下数据传输的可靠性。不过大家要记住哈,这个可靠性不是精准一次性,它只会代表数据不丢失,不会让数据只有一次。

    在kafka生产者可靠性这一部分,引入了一个ack机制和ISR队列两个概念,所以在具体说之前,我要给大家讲一下这两个机制都是干嘛的:

    ack:kafka对producer发送数据的一种应答机制,不同的返回结果代表了kafka对发送过来的数据的不同的处理规则。

    ISR:所有分区的副本集合,如果一个topic里面有三个分区,每个分区有三个副本,那么它的ISR队列就是【0,1,2】,是一个分区的Leader和Follower的集合。

    明确了这两个概念之后,就可以继续往下说啦,sender线程的seletor方法将数据发送到kafka的broker之后,broker会返回一个ack码,这个ack码有三种不同的内容,分别是 0 ,1 ,-1。

    0:生产者发送过来数据之后就不管了,丢失与否不重要。

    1:Leader把生产者发送过来的数据保存下来之后,就返回给producer。Follower是否同步不重要。

    -1(all):只有当leader接收完成数据,所有的Follower都备份完了,才会返回这个。

    这三种ack码可以由我们来指定,当我们确定时候那种策略之后,kafka生产者就会尊崇那种策略,并且依照策略进行工作。但是0和1都会面临数据丢失的问题,所以如果想要保证数据最少一次性,那就要使用ack码为-1的这种情况。

    代码实现如下:

    <pre data-language="java" id="qXUp8" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">public class KafkaProducerAck {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
                    MyPartitioner.class.getName());
            //todo 设置ack为 -1
            properties.put(ProducerConfig.ACKS_CONFIG,"all");
            //todo 设置发送失败的重试次数
            properties.put(ProducerConfig.RETRIES_CONFIG,10);
    
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            for (int i = 0; i < 100; i++) {
                kafkaProducer.send(new ProducerRecord<>("testTopic","number:"+i));
            }
    
            kafkaProducer.close();
        }
    }</pre>
    

    4.3 数据的唯一性

    ack机制只能保证数据不丢失,也就是数据的最少一次性,但是也正是因为这个最少一次性,就会造成数据丢失的隐患。所以就必须使用某种技术,来辅助ack机制完成数据的精准一次性。

    那这个时候就又要请出老哥俩了,幂等和事务。

    4.3.1 幂等

    幂等是默认开启的,它会保证数据无论是发送了多少次,在broker里面也就只有一条,不会让数据重复。也就是说,它是通过下面的这种方式来完成数据的精准一次性的:

    精准一次性 = 幂等 + 最少一次性

    不过幂等性维持的这个精准一次性仅仅只能在单会话、单分区内有效,单分区内是无可厚非的,因为分区间数据本来就是没什么关系的。只不过单会话就比较惨了,如果kafka一重启,可能会导致有些数据就会重复,这是因为幂等的判断标准是:

    <PID,Partition,SeqNumber>这样一个类似于元组的信息作为判断主键,如果主键一样那这条数据就不会被写入啦。在三个参数中,SeqNumber是一直单调递增的,Partition是分区号,正好对应了同分区内部的这样一个概念,儿PID是kafka每启动一次就会重新分配的东西,所以幂等只能保证是在单分区、单会话内。

    4.3.2 事务

    如果向开启事务,就要让幂等性保证开启,不过幂等默认就是开启的,所以做什么改变,直接开启事务就好。当使用事务之后,一旦发生故障,那么在同一个事务批次里面在故障前写入的数据就会回滚,回滚就会让重复写入的数据消失,所以也就实现了精准一次性。

    只不过事务需要多加入一个内容,就是自定义一个唯一的transactional.id,有了它,重启之后也能找到原来的位置,这样就使得数据的精准一次性不会再像幂等一样仅局限在单会话里面了。

    虽然好用,但是还是有点繁琐的,因为如果使用事务的话,还要加入一个事务协调器(transaction coordinator)的概念,下面我就按照开启事务的场景,给大家说一下开启事务后的提交流程。

    1.producer向事务协调器请求producer id,事务协调器会为不同的producer返回对应的PID。

    2.produce得到PID之后,就会向topic里面的Leader分区发送数据,当数据发送一阶段之后,生产者请求提交事务。

    3.事务协调器会处理这个事务提交请求,它会向kafka中存储事务信息的特殊主题发送请求,希望可以持久话这次事务提交请求。

    4.这个topic内部默认由50个分区,每一个分区都负责处理一部分事务。当事务提交请求发送过来的时候,就会根据这个事务TID的hash值,对50取模,算出自己该保存在哪个partition中,那这个partition的leader副本所在的broker节点,就是这个TID对应的事务协调器节点。

    <pre data-language="java" id="EaW1t" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">//事务实现的代码如下
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProducerTran {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            //todo 定义producer 的 Transaction ID
            properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
                    "Tid_0");
    
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
            //todo 初始化事务
            kafkaProducer.initTransactions();
            //todo 开启事务
            kafkaProducer.beginTransaction();
            try {
                for (int o = 5 ; o < 100 ; o ++){
                    kafkaProducer.send(new ProducerRecord<>("testTopic","number:" + o));
                }
                //定义一个异常,看看事务有没有回滚
                int i = 1 / 0;
                //todo 提交事务
                kafkaProducer.commitTransaction();
            }catch (Exception e){
                //todo 终止事务
                kafkaProducer.abortTransaction();
            }finally {
                //关闭资源
                kafkaProducer.close();
            }
        }</pre>
    

    4.4 生产者顺序

    生产者顺序发送数据,就是生产者发送数据保障的最后一个阶段啦。这个顺序就是单分区内有序,多分区间无序的。那么如果解决数据乱序呢?还记得之前提到的sender线程中维护了一个network client么,它的内部就是存储的一个又一个的请求,这个请求就是数据向kafka中发送数据的请求顺序,如果kafka的broker按照这个顺序来读取数据,数据就不会出现乱序的问题。

    但是这一组(5个为一组)中的请求,如果在读取第三个请求中的数据的时候出错了,等到第三个请求重试完成了,第4、5个请求都发送完了怎么办?

    这不是问题,因为在1.x版本之后,kafka内部会缓存这些请求的顺序,如果发生了12453的情况,那kafka是不会向下游发送数据的,而是会按照缓存的顺序,将处理过的请求排序,在向下游发送。

    5.结尾

    截止到目前位置,与kafka生产者有关的内容就说完啦。明天我会从kafka本身的角度来讲讲kafka处理数据时候是个什么样子的情况。

    相关文章

      网友评论

          本文标题:2.一文搞定Kafka的生产者

          本文链接:https://www.haomeiwen.com/subject/vffbvrtx.html