美文网首页
Kafka Producer API的使用

Kafka Producer API的使用

作者: EchoZhan | 来源:发表于2016-12-04 23:07 被阅读0次

    原创文章,转载请注明原作地址:http://www.jianshu.com/p/ec1063a13b53

    kafka版本:2.10-0.10.0.1
    API jar包:kafka-clients-0.10.0.1.jar

    背景:kafka集群从2.10-0.8.2.1升级到2.10-0.10.0.1后,发现原先使用的写日志到Kafka的API全部被标记成了deprecated的状态

    kafka_2.10-0.10.0.1.jar
    kafka.javaapi.producer.Producer;
    kafka.producer.KeyedMessage;
    kafka.producer.ProducerConfig;
    

    查过官方文档,发现从0.8的版本之后,出了一组新的API,用异步方式发送消息,号称性能好过旧的API

    kafka-clients-0.10.0.1.jar
    org.apache.kafka.clients.producer.KafkaProducer;
    org.apache.kafka.clients.producer.Producer;
    org.apache.kafka.clients.producer.ProducerConfig;
    org.apache.kafka.clients.producer.ProducerRecord;
    

    因为业务需求,我们并不能容忍回写到kafka的日志丢失,因此必须使用同步的方式发送消息。而新的API默认是使用异步方式的,简单研究之后发现,想实现同步方式发送,实现如下:

    首先需要定义一个KafkaProducer:

    public Producer<Integer, String> initKafkaProducer(String brokerList){
        Properties props = new Properties();
    
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList.substring(1));//格式:host1:port1,host2:port2,....
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);//a batch size of zero will disable batching entirely
        props.put(ProducerConfig.LINGER_MS_CONFIG, 0);//send message without delay
        props.put(ProducerConfig.ACKS_CONFIG, "1");//对应partition的leader写到本地后即返回成功。极端情况下,可能导致失败
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<Integer, String> kafkaProducer = new KafkaProducer<Integer, String>(props);
        return kafkaProducer;
    }
    

    设置了以上之后,虽然每条消息是被单独发送的,但是,对于send方法的作用,文档中的说明是这样的:

    The send is asynchronous and this method will return immediately once the record has been stored in the buffer of records waiting to be sent. This allows sending many records in parallel without blocking to wait for the response after each one. Since the send call is asynchronous it returns a Future for the RecordMetadata that will be assigned to this record. Invoking get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.

    因此,实际上调用send方法并不能保证消息被成功写入到kafka。为了实现同步的发送消息,并监控每条消息是否发送成功,需要对每次调用send方法后返回的Future对象调用get方法。(get方法的调用会导致当前线程block,直到发送结果返回,不管是成功还是失败)

    public void send2Kafka(Producer<Integer, String> kafkaProducer, String topic, List<String> lines) throws InterruptedException, ExecutionException{
        for(String line : lines) {
            ProducerRecord<Integer, String> message = new ProducerRecord<Integer, String>(topic, line);
            this.kafkaProducer.send(message).get();
        }
    }
    

    当get方法抛出一个错误时,说明数据没有被正确写入,此时需要处理这个错误。

    参考资料:

    1. http://kafka.apache.org/documentation.html#producerconfigs
    2. http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
    3. http://stackoverflow.com/questions/35984247/what-is-the-difference-between-kafka-producerrecord-and-keyedmessage

    相关文章

      网友评论

          本文标题:Kafka Producer API的使用

          本文链接:https://www.haomeiwen.com/subject/ogqkmttx.html