美文网首页
Kafka读书笔记:生产者(Producer)

Kafka读书笔记:生产者(Producer)

作者: RealityVibe | 来源:发表于2020-11-08 22:29 被阅读0次

    生产者

    客户端开发

    正常的生产逻辑需要具备以下几个步骤:

    1. 构建生产者客户端参数及创建相应的生产者实例
    2. 构建待发送的消息
    3. 发送消息
    4. 关闭生产者实例

    消息的发送

    发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)

    发后即忘:

    ​ 只管往Kafka发送消息,而不关心消息是否正确到达。在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式性能最高(因为不用做检查处理,牺牲了消息的安全性,达到高吞吐的目的),可靠性最差。

    同步:

    ​ 利用返回Future对象实现如下代码所示,失败时,捕捉异常,并做响应的异常处理。也可以通过注释的代码部分获取RecordMetadata对象,包含当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。

    异常
    • 可重试异常:

    ​ 常见的可重试异常有:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。比如NetworkException 表示网络异常,这个有可能是由于网络瞬时故障而导致的异常,可以通过重试解决;又比如LeaderNotAvailableException表示分区的leader副本不可用,这个异常通常发生在leader副本下线而新的 leader 副本选举完成之前,重试之后可以重新恢复

    ​ 配置retries 参数设置可重试异常的重试次数。如果重试超过设置次数之后还没有恢复,那么仍会抛出异常,进而发送的外层逻辑就要处理这些异常了。

    • 不可重试异常

    ​ RecordTooLargeException异常,暗示了所发送的消息太大,KafkaProducer对此不会进行任何重试,直接抛出异常。

    private static void sendData() {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
    
        KafkaProducer producer = new KafkaProducer<String, String>(properties);
    
    
        ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry",
                "Precision Producer", "France2");
        try {
            producer.send(record).get();
           // Future<RecordMetadata> future = producer.send(record);
           // RecordMetadata metaData = future.get();
        } catch (Exception e) {
            System.out.println("send Fail:" + e.getMessage());
        }
    
        producer.close();
    }
    
    异步发送
    1. 一般是在send()方法中指定一个Callback的回调函数,Kafka在返回响应式调用改函数来实现异步的发送请求。
    2. onCompletion()方法的两个参数是互斥的,消息发送成功时,metadata 不为 null 而exception为null;消息发送异常时,metadata为null而exception不为null。
    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            // recordMetadata和e必有一个为null
            if (e != null) {
                e.printStackTrace();
            } else {
                System.out.println(recordMetadata);
            }
        }
    });
    
    close()方法

    close()方法会阻塞等待之前所有的发送请求完成后再关闭KafkaProducer。下面这个带超时时间参数的close()方法,只会等待timeout的时间,如果在timeout时间后仍未完成所有的请求处理,会强行退出,在实际应用中,一般使用无参close()方法。

    public void close(long timeout, java.util.concurrent.TimeUnit timeUnit)
    

    序列化

    ​ 生产者需要用序列化器把对象转换成字节数组才能通过网络发送给Kafka。而在接收方,消费者需要使用反序列化器把Kafka中收到的字节数组转换成对应的对象。

    生产者拦截器

    ​ KafkaProducer中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。拦截链会按照 interceptor.classes 参数配置的拦截器的顺序来一一执行(配置的时候,各个拦截器之间使用逗号隔开)

    public interface ProducerInterceptor<K, V> extends Configurable {
        public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
        public void onAcknowledgement(RecordMetadata metadata, Exception exception);
        public void close();
    }
    

    在这 3 个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。

    onSend()

    调用时间点:将消息序列化和计算分区之前

    ​ KafkaProducer会在将消息序列化和计算分区之前调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作。不建议在这里修改ProducerRecord的topic、key和partition信息(可能会对问题的排查和定位造成影响,以及出现难以预料的bug),比如修改key不仅会影响分区的计算,同样会影响broker端日志压缩的功能。

    onAcknowledgement()

    调用时间点:在消息被应答(acknowledgement)之前或消息发送失败时,优先于用户设定的CallBack方法。

    ​ KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement()方法,优先于用户设定的 Callback 之前执行。这个方法运行在Producer 的 I/O 线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。

    close()

    ​ close()方法主要用于在关闭拦截器时执行一些资源的清理工作。

    生产者客户端整体架构

    生产者客户端的整体架构

    分区

    如果key为null,会被随机分配到主题内各个可用的分区。如果key不为null,会对key进行计算决定分到哪个分区。

    优点

    ​ 使用Kafka自己的hash算法对key进行运算,并不会因为java版本的升级导致分区结果不同。

    缺点

    ​ 由于使用了hash算法,对横向扩展不友好。一旦主题增加了分区的个数,可能会造成旧的数据还在老的分区,新的数据被分配到了新的分区。如果要使用键来映射分区,最好在创建主题的时候就把分区规划好。

    自定义分区策略

    ​ 如果出现某个key的数据量特别大,导致按默认key分区后,对应的分区数据量明显过大,从而导致存储和性能上的问题。这时候就需要使用自定义分区策略实现(通过实现Partitioner)。

    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.record.InvalidRecordException;
    import org.apache.kafka.common.utils.Utils;
    
    import java.io.InvalidClassException;
    import java.util.Map;
    
    /**
     * @author by yze on 2020/11/8
     * @since 202011
     */
    public class MyPartitioner implements Partitioner {
    
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    
            int numPartitions = cluster.partitionCountForTopic(topic);
            if (keyBytes == null || !(key instanceof String)) {
                throw new InvalidRecordException("不支持的数据类型");
            }
            if (((String) key).equals("special")) {
                // 专门定制到一个分区
                return numPartitions;
            }
            // 其它的key会计算后被分配到除了最后一个分区以外的分区
            return Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1);
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    

    相关文章

      网友评论

          本文标题:Kafka读书笔记:生产者(Producer)

          本文链接:https://www.haomeiwen.com/subject/ghdkbktx.html