美文网首页
2018-07-27-producer

2018-07-27-producer

作者: 迪奥炸 | 来源:发表于2018-07-27 19:38 被阅读0次

    https://blog.csdn.net/cjf_wei/article/details/77920435

    Producer是一个用于向kafka集群发送数据的Java客户端

    //创建kafka producer

    Properties props = new Properties();

    props.put("bootstrap.servers", "192.168.137.200:9092");

    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    // KafkaProducer对象实例化方法,可以使用map形式的键值对或者Properties对象来配置客户端的属性

    Producer procuder = new KafkaProducer(props);

    producer包含一个用于保存待发送消息的缓冲池

    缓冲池中消息是还没来得及传输到kafka集群的消息。

    位于底层的kafka I/O线程负责将缓冲池中的消息转换成请求发送到集群。

    如果在结束produce时,没有调用close()方法,那么这些资源会发生泄露。 

    bootstrap.servers:用于初始化建立链接到kafka集群,以host:port形式,多个以逗号分隔host1:port1,host2:port2; 

    acks: 生产者需要server端在 接收到消息后,进行反馈确认的尺度,主要用于消息的可靠性传输;acks=0表示生产者不需要来自server的确认;acks=1表示server端将消息保存后即可发送ack,而不必等到其他follower角色都收到了该消息;acks=all(or acks=-1)意味着server端将等待所有的副本都被接收后才发送确认。

    retries:生产者发送失败后,重试的次数

    batch.size:当多条消息发送到同一个partition时,该值控制生产者批量发送消息的大小,批量发送可以减少生产者到服务端的请求书,有助于提高客户端和服务端的性能。

    linger.ms:默认情况下缓冲区的消息会被立即发送到服务端,即使缓冲区的空间并没有被用完。可以将该值设置为大于0的值,这样发送者将等待一段时间后,再向服务端发送请求,以实现每次请求可以尽可能多的发送批量消息。 

    batch.size和linger.ms是两种实现让客户端每次请求尽可能多的发送消息的机制,它们可以并存使用,并不冲突。 

    buffer.memory:生产者缓冲区的大小,保存的是还未来得及发送到server端的消息,如果生产者的发送速度大于消息被提交到server端的速度,该缓冲区将被耗尽。 

    key.serializer,value.serializer说明了使用何种序列化方式将用户提供的key和vaule值序列化成字节。


    /**record:key-value形式的待发送数据  *callback:到发送的消息被borker端确认后的回调函数*/

    public Future<RecordMetadata> send(ProducerRecord<K,V> record); // Equivalent to send(record, null)

    public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback);

    send方法负责将缓冲池中的消息 异步发送到broker的指定topic中,

    异步发送是指,方法将消息存储到底层待发送的I/O缓存后,将立即返回,这可以实现并行无阻塞的发送更多消息。

    send方法的返回值是RecordMatadata类型,它含有消息将被投递的partition消息,该条消息的offset,以及时间戳

    因为send返回的是Future对象,因此在该对象上调用get()方法将阻塞直到相关的发送请求完成并返回元数据信息;或者在发送时抛出异常而退出。

    1、阻塞

    2、利用回调函数和异步发送方式来确认消息发送的进度


    flush:立即发送缓存数据

    public void flush

    调用该方法将使得缓冲区的所有消息被立即发送(即使linger.ms参数被设置为大于0),

    且会阻塞直到这些相关消息的发送请求完成。

    flush方法的前置条件是:之前发送的所有消息请求已经完成。

    一个请求被视为完成是指:根据acks参数配置项收到了相应的确认,或者发送中抛出异常失败了。


    partitionsFor

    //获取指定topic的partition元数据信息

    public List<PartitionInfo> partitionsFor(String topic);


    close

    //关闭producer,方法将被阻塞,直到之前的发送请求已经完成

    public void close();// equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)

    public void close(long timeout,TimeUnit timeUnit); //同上,方法将等待timeout时长,以让未完成的请求完成发送

    相关文章

      网友评论

          本文标题:2018-07-27-producer

          本文链接:https://www.haomeiwen.com/subject/xbcgmftx.html