美文网首页
读书笔记:Kafka生产者客户端入门

读书笔记:Kafka生产者客户端入门

作者: 东南枝下 | 来源:发表于2020-11-19 10:02 被阅读0次

Kafka生产者客户端入门

[TOC]

demo

/**
 * 生产者
 *
 * @author Jenson
 */
public class ProducerFastStart {

    private static final String BROKER_LIST = "localhost:9092";
    private static final String TOPIC = "topic-demo";

    public static Properties initConfig() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
        // 重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);
        // 生产者拦截器
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
        return properties;
    }

    public static void main(String[] args) {
        Properties properties = initConfig();
        // 配置生产者客户端参数,并创建生产者实例
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        // 构造所需要发送的消息
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "hello kafka! again!");

        // 发送消息
        producer.send(record, new Callback() {

            /**
             * onCompletion()方法的两个参数是互斥的,
             * 消息发送成功时,metadata 不为 null 而exception为null;
             * 消息发送异常时,metadata为null而exception不为null。
             *
             * @param recordMetadata
             * @param e
             */
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e != null) {
                    e.printStackTrace();
                } else {
                    System.out.println("topic : " + recordMetadata.topic() +
                            " ,hasOffset : " + recordMetadata.hasOffset() +
                            " ,offset : " + recordMetadata.offset() +
                            " ,partition : " + recordMetadata.partition());
                }
            }
        });

        // 关闭生产者客户端
        producer.close();
    }
}
/**
 * 客户化生产者拦截器
 *
 * @author Jenson
 */
public class MyProducerInterceptor implements ProducerInterceptor {
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        ProducerRecord record = new ProducerRecord(producerRecord.topic(),
                producerRecord.partition(),
                producerRecord.timestamp(),
                producerRecord.key(),
                "prefix-" + producerRecord.value(),
                producerRecord.headers());
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        if (e == null) {
            System.out.println("----> 发送成功");
        }
    }

    @Override
    public void close() {
        System.out.println("----> 结束,回收资源");
    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

生产者对象

org.apache.kafka.clients.producer.KafkaProducer

三个必填参数:

bootstrap.servers:连接Kafka集群所需要的broker地址清单,可以为多个地址,逗号分开

key.serializer和value.serializer:broker端接收的消息必须以字节数组(byte[])的形式存在

其他参数:
client.id : KafkaProducer对应的客户端id,默认值为“”,如果客户端不设置,则KafkaProducer会自动生成一个非空字符串,“producer-”与数字的拼接。

配置参数常量类:org.apache.kafka.clients.producer.ProducerConfig

KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程调用。

发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)

消息发送

  • 生产者消息发送 ProducerRecord.send() ProducerRecord.send(value, callback)

  • 发送消息时报错 对于可重试的异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。 properties.put(ProducerConfig.RETRIES_CONFIG,10); //设置重试次数

  • **消息发送回调 **

    // 发送消息
        producer.send(record, new Callback() {
              // ... 详见demo
        });
    
    producer.send(record,callback1);  
    producer.send(record,callback2);  
    

    callback1在callback2前调用,回调函数的调用也可以保证分区有序。

  • producer.close(); // 关闭生产者客户端 close()方法会阻塞等待之前所有的发送请求完成后再关闭,回收所占用的资源

序列化

可以自定义序列化,实现 org.apache.kafka.common.serialization.Serializer 接口

分区器

  • 如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。

  • Kafka中提供的默认分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner,它实现了org.apache.kafka.clients.producer.Partitioner接口

  • 如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区。

    如果 key 不为 null,那么计算得到的分区号会是所有分区中的任意一个;如果 key为null,那么计算得到的分区号仅为可用分区中的任意一个

  • 自定义分区可以实现Partitioner接口,通过配置参数partitioner.class来显式指定这个分区器

生产者拦截器

  • 生产者拦截器既可以用来在消息发送前做一些准备工作

    KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作

  • 也可以用来在发送回调逻辑前做一些定制化的需求,

    KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement()方法,优先于用户设定的 Callback 之前执行

  • 生产者拦截器,自定义实现org.apache.kafka.clients.producer.ProducerInterceptor接口

    在KafkaProducer 的配置参数interceptor.classes中指定这个拦截器

  • 可以配置多个自定义拦截器,用逗号分隔

相关文章

网友评论

      本文标题:读书笔记:Kafka生产者客户端入门

      本文链接:https://www.haomeiwen.com/subject/ogkhiktx.html