美文网首页
kafka producer

kafka producer

作者: laowangv2 | 来源:发表于2021-05-18 22:11 被阅读0次

    一、参数

    1. 必要参数

    • boostrap.servers,boker的地址,部分即可
    • key.serializer和value.serializer,key和value的序列化器

    2. 其他

    二、发送消息

    1. 调用方式

    • 发后即忘
    • 同步
      producer.send(record).get()
    • 异步
      send with callback

    2. 异常

    • 可重试异常
      NetworkException、LeadernotAvailableException等,会自动重试retries参数的次数,然后会抛出异常
    • 不可重试异常
      比如RecordTooLargeException,直接抛异常

    三、发送流程

    架构
    图来自https://blog.csdn.net/shufangreal/article/details/110657052

    核心组件

    • 拦截器
    • 序列化器
    • 分区器
    • 消息累加器
      这里分区1、分区2就是topic的具体partition,ProducerBatch就是多个ProducerRecord。整个消息累加器的大小由buffer.mermory控制,默认32MB,满了之后阻塞max.block.ms(默认60000),然后抛出异常
    • InFlightRequests
      这里的node1、node2对应的是partition转换成ip+端口的形式,Request是kafka的发送协议结构。这里存着已发送但没有收到相应的请求,每个连接最多缓存max.in.flight.requests.per.connection(默认5)个。保证有序性时这个参数要设为1

    四、元数据更新

    producer需要通过broker获取kafka集群的元数据,当没有所需元数据或超过metradata.max.age.ms(默认300000)就拉取一次。这里涉及leastLoadedNode的概念,也就是InFlightRequests中等待Requests最少的Node,producer会向这个节点发送拉取请求,因为认为它负载最小。

    相关文章

      网友评论

          本文标题:kafka producer

          本文链接:https://www.haomeiwen.com/subject/nxiyjltx.html