美文网首页Kafka技术专栏
Kafka解惑之Old Producer(1)—— Beginn

Kafka解惑之Old Producer(1)—— Beginn

作者: 朱小厮 | 来源:发表于2018-01-29 12:42 被阅读93次

    众所周知,目前Kafka的最新版本已经到达1.0.0,很多公司运行的kafka也大多升级到了0.10.x版本,Kafka的Producer客户端早已不再使用0.8.2.x就已基本停止维护的Scala版本的Producer了,那么我们还有必要了解它么?当然很有必要,通过Kafka Old Producer我们可以了解Kafka变迁升级的历史:旧版的Old Producer模型相对简单利于初始了解,通过对Old Producer的了解也可以慢慢的发现隐患的问题,这样进一步可以研究探讨解决方法,最后再通过对新版Producer的学习来提升对Kafka的认知,与此同时也可以让读者在遇到相似问题的时候可以借鉴Kafka的优化过来来优化自己的应用。以铜为鉴,可以正衣冠。

    在使用Scala版本的Kafka生产者客户端kafka.javaapi.producer.Producer时,实际上调用的是kafka.producer.Producer类。

    package kafka.javaapi.producer
    class Producer[K, V](private val underlying : kafka.producer.Producer[K, V]) extends scala.AnyRef {
      def this(config : kafka.producer.ProducerConfig) = { /* compiled code */ }
      def send(message : kafka.producer.KeyedMessage[K, V]) : scala.Unit = { /* compiled code */ }
      def send(messages : java.util.List[kafka.producer.KeyedMessage[K, V]]) : scala.Unit = { /* compiled code */ }
      def close : scala.Unit = { /* compiled code */ }
    }
    

    包括kafka-console-producer.sh的脚本(常用来测试发送消息之用)中,对于0.8.2.x版本如果不指定“-- new-producer”参数;或者对于.0.0版本如果指定“-- old-producer”参数的话,实际上内部调用的都是kafka.producer.Producer这个类。

    对于kafka-console-producer.sh脚本的内容如下:

    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
        export KAFKA_HEAP_OPTS="-Xmx512M"
    fi
    exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
    

    我们看到实际上kafka-console-producer.sh的内容就是运行kafka.tools.ConsoleProducer而已,可以看到main函数代码块中的config.useOldProducer,这个笔者看的是1.0.0版本的代码,而0.8.2.2版本中的ConsoleProducer对应的是config.useNewProducer,稍有不同而已,不过如果都指定使用旧版的Scala的Producer,那么都是指kafka.producer.OldProducer。

    object ConsoleProducer {
      def main(args: Array[String]) {
        try {
            val config = new ProducerConfig(args)
            val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
            reader.init(System.in, getReaderProps(config))
    
            val producer =
              if(config.useOldProducer) {
                new OldProducer(getOldProducerProps(config))
              } else {
                new NewShinyProducer(getNewProducerProps(config))
              }
    

    进一步剖析,kafka.producer.OldProducer的内部构造很简单,关键代码如下:

    class OldProducer(producerProps: Properties) extends BaseProducer {
      // default to byte array partitioner
      if (producerProps.getProperty("partitioner.class") == null)
        producerProps.setProperty("partitioner.class", classOf[kafka.producer.ByteArrayPartitioner].getName)
      val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))
    

    可以看到内部的producer最终还是实例化的kafka.producer.Producer。最终验证了开篇所述的旧版的Kafka生产者客户端即为Kafka.producer.Producer。

    新版的Java版的Kafka客户端是:org.apache.kafka.clients.producer.KafkaProducer,读者请注意区分。对于新版的KafkaProducer在以后的文章中会有详细介绍。

    下面就来深入了解下Kafka.producer.Producer(下面如无特殊说明都将Kafka.producer.Producer此简称为Producer)了。当实例化Producer的时候,首先要读取、解析以及校验配置信息的合法性,根据配置信息来实例化Producer。Producer的配置项有18个,比如设置分区器、消息压缩方式等,这些都比较好理解,而最主要的配置就是request.required.acks和producer.type这两个配置。

    request.required.acks是用来配置生产端消息确认的方式,在0.8.x这个系列的版本之中,可以配置为0,1,-1的值,也可以配置为其他的整数值,用来控制一条消息经由多少个ISR中的副本所在的Broker确认之后才向客户端发送确认信息,这个参数在之后的版本,比如1.0.0版本中就只能设置0,1,-1(all)这3(4)种取值,分别表示:

    1. 当request.required.acks=0时,这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
    2. 当request.required.acks=1(默认)时,这意味着producer在ISR中的leader已成功收到数据并得到确认。如果leader宕机了,则会丢失数据。
    3. 当request.required.acks=-1时,producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时,这样就变成了acks=1的情况。为了提高数据的可靠性,可以通过min.insync.replicas参数来辅助作用,当同步副本数不足时,生产者会跑出异常。

    有关kafka的消息可靠性的更深层次的讲解可以参考我2017年初的一篇博客:kafka数据可靠性深度解读,这篇博客主要是针对0.8.2.x版本的kafka做深层次的探讨,后续会对1.0.0版本做进一步的说明。

    Producer的发送模式分为同步(sync)和异步(async)两种情况,这一点可以通过参数producer.type来配置。同步模式会将消息直接发往broker中,而异步模式则会将消息存入LinkedBlockingQueue中,然后通过一个ProducerSendThread来专门发送消息。为了便于说明,笔者这里先对同步模式的情况来做说明,而异步模式只是在同步模式的基础上做了一些封装而已。

    class Producer[K,V](val config: ProducerConfig,
                        private val eventHandler: EventHandler[K,V])  // only for unit testing
      extends Logging {
    
      private val hasShutdown = new AtomicBoolean(false)
      private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)
    
      private var sync: Boolean = true
      private var producerSendThread: ProducerSendThread[K,V] = null
      private val lock = new Object()
    
      config.producerType match {
        case "sync" =>
        case "async" =>
          sync = false
          producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
                                                           queue,
                                                           eventHandler,
                                                           config.queueBufferingMaxMs,
                                                           config.batchNumMessages,
                                                           config.clientId)
          producerSendThread.start()
      }
    

    在讲述Producer的具体行为之前先来看一个发送方的Demo:

    public class ProducerScalaDemo {
        public static final String brokerList = "xxx.xxx.xxx.xxx:9092";
        public static final String topic = "topic-zzh";
    
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put("serializer.class", "kafka.serializer.StringEncoder");
            properties.put("metadata.broker.list", brokerList);
            properties.put("producer.type", "sync");
            properties.put("request.required.acks", "1");
    
            Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(properties));
    
            String message = "kafka_message-" + new Date().getTime() + " edited by hidden.zhu";
            KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic,null, message);
            producer.send(keyedMessage);
        }
    }
    

    我们可以看到再初始化Producer的时候之用了ProducerConfig这一个类型的参数,而在Producer的类定义中还用到了EventHandler这个类型的参数。在Scala语言中只有一个主构造函数,这个主构造函数的参数列表就是跟在类名后面括号中的各个的参数,如果要重载的话就需要自定义辅助构造函数,辅助构造函数必须调用主构造函数(this方法)。如此上面这个Demo中很显然的就调用了辅助构造函数来进行实例化,那么我们再来看下其对应的辅助构造函数:

    def this(config: ProducerConfig) =
      this(config,
           new DefaultEventHandler[K,V](config,
                                        CoreUtils.createObject[Partitioner](config.partitionerClass, config.props),
                                        CoreUtils.createObject[Encoder[V]](config.serializerClass, config.props),
                                        CoreUtils.createObject[Encoder[K]](config.keySerializerClass, config.props),
                                        new ProducerPool(config)))
    

    这里又引入了两个新的东西:DefaultEventHandler和ProducerPool,这个DefaultEventHandler继承了EventHandler这个类,这个是消息发送的关键。而ProducerPool内部是一个HashMap,其中的key是broker的id,而value就是每个broker对应的SyncProducer,这个SyncProducer就是真正的消息发送者。


    PS:消息中间件(Kafka、RabbitMQ)交流可加微信:hiddenzzh
    欢迎支持《RabbitMQ实战指南》以及关注微信公众号:朱小厮的博客。


    欢迎关注

    相关文章

      网友评论

        本文标题:Kafka解惑之Old Producer(1)—— Beginn

        本文链接:https://www.haomeiwen.com/subject/kcjxzxtx.html