美文网首页眼君的大数据之路
Kafka使用笔记(二、生产者详解)

Kafka使用笔记(二、生产者详解)

作者: 眼君 | 来源:发表于2020-10-12 11:27 被阅读0次

消息发送类型

  1. 发送即忘记
producer.send(record)
  1. 同步发送
//通过send()发送完消息后返回一个Future对象,然后调用Future对象的get()方法等待kafka响应。
//如果kafka正常响应,返回一个RecordMetadata对象,该对象存储消息的偏移量。
//如果Kafka发生错误,无法正常响应,就会抛出异常,我们便可以进行异常处理
try{
            Future<RecordMetadata> send = producer.send(record);
            RecordMetadata recordMetadata = send.get();
            System.out.println("topic:"+recordMetadata.topic());
            System.out.println("partition:"+recordMetadata.partition());
            System.out.println("offset:"+recordMetadata.offset());
        }catch (Exception e){
            e.printStackTrace();
        }
  1. 异步发送
try{
            producer.send(record,new Callback(){
                public void onCompletion(RecordMetadata metadata, Exception exception){
                    if (exception == null){
                        System.out.println(metadata.partition() + ":" + metadata.offset());
                    }
                }
            });
        }catch (Exception e){
            e.printStackTrace();
        }

序列化器

消息要到网络上进行传输,必须进行序列化,而序列化器的作用就是如此。
Kafka提供了默认的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer),还有整型(IntegerSerializer)和字节数组(BytesSerializer)序列化器,这些序列化器都实现了接口(org.apache.kafka.common.serialization.StringSerializer)基本上能够满足大部分场景的需求。

分区器

本身Kafka有自己的分区策略,如果未指定就会使用默认的分区策略;
Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions。如果key相同的话,那么就会分配到统一分区。

拦截器

Producer拦截器(interceptor)是个相当新的功能,它和consumer端的interceptor是在Kafka 0.10版本被引入的,主要用于实现clients端定制化控制逻辑。
生产这拦截器可以用在消息发送前做一些准备工作。

使用场景

  1. 按照某个规则过滤掉不符合要求的消息。
  2. 修改消息的内容。
  3. 统计类需求。

其他生产者参数

acks

这个参数用来指定分区中必须有多少个副本收到这条消息,之后生产者才会认为这条消息写入成功。acks是生产者客户端中非常重要的一个参数,它涉及到消息的可靠性和吞吐量之间的权衡。

  1. ack=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。如果出现问题生产者是感知不到的,消息就丢失了。不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
  2. ack=1,默认值1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法达到首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。但是,这样还有可能导致数据丢失,如果收到写成功通知,此时首领节点还没来得及同步数据到follower节点,首领节点崩溃,就会导致数据丢失。
  3. ack=-1,只有当所有参与复制的节点都收到消息时,生产者会收到一个来自服务器的成功响应,这种模式是最安全的,它可以保证不止一个服务器收到消息。
    注意:acks参数配置是一个字符串类型,而不是整数类型。
retries

生产者从服务器收到的错误可能是临时性错误(比如分区找不到首领)。在这种情况下,如果达到了retries设置的次数,生产者会放弃重试返回报错。默认情况下,生产者每次重试之间等待100ms,可以通过retries.backoff.ms参数来修改这个时间间隔。

batch.size

当有多个消息要被发送到同一个分区时,生产者会把它们放到同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算,而不是消息个数。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只含有一个消息的批次也可能被发送。所以就算batch.size设置很大,也不会造成延迟,只会占用更多内存而已,如果设置太小,生产者会因为频繁发送消息而增加一些额外开销。

max.request.size

该参数用于控制生产者发送的请求大小,它可以指定能发送的单个消息的最大值,也可以指单个请求里所有消息的总大小。broker对可接收的消息最大值也有自己的限制(message.max.size),所以两边的配置最好匹配,避免生产者发送的消息被broker拒绝。

相关文章

网友评论

    本文标题:Kafka使用笔记(二、生产者详解)

    本文链接:https://www.haomeiwen.com/subject/sylupktx.html