4. kafka生产者&消费者

作者: 阿飞的博客 | 来源:发表于2018-06-28 18:18 被阅读261次

    原生方式

    无论是生产者还是消费者,引入的依赖都是kafka-clients,maven坐标如下:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.1.0</version>
    </dependency>
    

    生产者

    kafka生产者对象就是KafkaProducer,构造方式如下:

    Properties props = new Properties();
    // kafka集群地址
    props.put("bootstrap.servers", "10.0.55.229:9092");
    // kafka消息key的序列化方式
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // kafka消息value的序列化方式
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
    

    消息

    -创建生产者
    KafkaProducer构造好后,需要构造待发送的消息。kafka消息对象是ProducerRecord,根据源码可知,构造方式有多种:

    public class ProducerRecord<K, V> {
    
        /**
         * 所有构造方法最后都是调用这个构造方法, 所以弄明白这个构造方法所有参数含义就可以了
         * Creates a record with a specified timestamp to be sent to a specified topic and partition
         * @param topic - The topic the record will be appended to, topic名称
         * @param partition - The partition to which the record should be sent, 消息发送的目标分区名称, 如果不指定, kafka会根据Partitioner计算目标分区
         * @param timestamp - The timestamp of the record, in milliseconds since epoch. If null, the producer will assign
         *                  the timestamp using System.currentTimeMillis(). 消息发送的指定时间戳, 默认为当前时间
         * @param key - The key that will be included in the record, 消息的key, kafka根据这个key计算分区
         * @param value - The record contents 消息的内容
         * @param headers - the headers that will be included in the record
         */
        public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
            // topic是构造ProducerRecord的必传参数
            if (topic == null)
                throw new IllegalArgumentException("Topic cannot be null.");
            // 发送的时间戳不能为负数
            if (timestamp != null && timestamp < 0)
                throw new IllegalArgumentException(
                        String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
            // 分区值不能为负数
            if (partition != null && partition < 0)
                throw new IllegalArgumentException(
                        String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
            this.topic = topic;
            this.partition = partition;
            this.key = key;
            this.value = value;
            this.timestamp = timestamp;
            this.headers = new RecordHeaders(headers);
        }
    
        public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
            this(topic, partition, timestamp, key, value, null);
        }
    
        public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
            this(topic, partition, null, key, value, headers);
        }
        
        public ProducerRecord(String topic, Integer partition, K key, V value) {
            this(topic, partition, null, key, value, null);
        }
        
        public ProducerRecord(String topic, K key, V value) {
            this(topic, null, null, key, value, null);
        }
        
        public ProducerRecord(String topic, V value) {
            this(topic, null, null, null, value, null);
        }
        
        ... ...
    }
    
    • 创建消息
      下面构造一个最常用的ProducerRecord,只指定topic和value,由kafka去决定分区:
    // ProducerRecord就是发送的信息对象, 包括: topic名称, key(可选), value(发送的内容)
    // key的用途主要是:消息的附加信息,用来决定消息被写到哪个分区,拥有相同key的消息会被写到同一个分区
    ProducerRecord<String, String> record = new ProducerRecord<>("ORDER-DETAIL",
        JSON.toJSONString(new Order(201806260001L, new Date(), 98000, "desc", "165120001")));
    

    消费者

    • 创建消费者
      kafka消费者者对象就是KafkaConsumer,构造方式如下:
    Properties props = new Properties();
    // kafka集群地址
    props.put("bootstrap.servers", "10.0.55.229:9092");
    // ConsumerGroup即消费者组名称
    props.put("group.id", "afei");
    // kafka消息key的反序列化方式
    props.put("key.deserializer",   "org.apache.kafka.common.serialization.StringDeserializer");
    // kafka消息value的序列化方式
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
    
    • 订阅并消费
    // 订阅的topic名称
    kafkaConsumer.subscribe(Lists.newArrayList("ORDER-DETAIL"));
    try {
        while (true) {
            // 消费者必须持续从kafka进行轮询, 否则会被认为死亡, 从而导致它处理的分区被交给同一ConsumerGroup的其他消费者
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            // 为了防止消费者被认为死亡, 需要尽可能确保处理消息工作尽快完成
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("message content: "+GSON.toJson(record));
                System.out.println("message value  : "+record.value());
            }
            // 每次消费完后异步提交
            kafkaConsumer.commitAsync();
        }
    }finally {
        // 消费者关闭之前调用同步提交
        kafkaConsumer.commitSync();
        kafkaConsumer.close();
    }
    

    集成spring方式

    现在的项目一般标配了spring,通过spring集成kafka能够大大的方便业务开发。集成方式也比较简单,只需增加如下maven坐标:

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-kafka</artifactId>
        <version>3.0.3.RELEASE</version>
    </dependency>
    

    生产者

    • 定义生产者
      spring集成kafka的生产者配置方式如下(部分属性配置通过properties解耦,用户使用时可以自定义):
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
             http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <!-- 定义producer的参数 -->
        <bean id="producerProperties" class="java.util.HashMap">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
                    <entry key="retries" value="${kafka.reties}" />
                    <entry key="retry.backoff.ms" value="${kafka.retry.backoff.ms}" />
                    <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
                    <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
                </map>
            </constructor-arg>
        </bean>
    
        <!-- 创建kafkatemplate需要使用的producerfactory bean -->
        <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg ref="producerProperties"/>
        </bean>
    
        <!-- 创建KafkaTemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
        <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
            <constructor-arg ref="producerFactory" />
            <constructor-arg name="autoFlush" value="true" />
            <property name="defaultTopic" value="defaultTopic" />
        </bean>
    
    </beans>
    
    • 发送消息
      发送消息进行如下封装,封装后如果要发送kafka消息,只需一行代码即可,例如kafkaProducer.send(topicName, obj);(obj就是要发送的消息对象):
    /**
     * @author afei
     * @version 1.0.0
     * @since 2018年06月11日
     */
    @Component
    public class MyKafkaProducer {
    
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
        // 发送的消息统一通过google-gson序列化
        private static final Gson GSON = new Gson();
        // KafkaTemplate就是上面xml文件中定义的bean
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        public boolean send(String topic, String key, Object msg){
            // 将发送的消息序列化为json
            String json = toJsonString(msg);
            try {
                ListenableFuture<SendResult<String, String>> futureResult = kafkaTemplate.send(
                        topic, key, json);
                logger.info("Kafka send json: {}, topicName: {}", json, topic);
                SendResult<String, String> result = futureResult.get();
                // 这里的输出日志, 不能用fastjson, fastjson默认依赖bean的setter/getter方法,
                // 而SendResult中的RecordMetadata的属性并没有setter/getter方法
                logger.info("Kafka send result: {}", GSON.toJson(result));
                return result!=null;
            } catch (Throwable e) {
                logger.error("Kafka send failed.", e);
            }
            return false;
        }
    
        public boolean send(String topic, Object msg){
            return send(topic, null, msg);
        }
    
        private String toJsonString(Object o) {
            String value;
            if (o instanceof String) {
                value = (String) o;
            } else {
                value = JSON.toJSONString(o);
            }
            return value;
        }
    }
    

    消费者

    • 定义消费者
      spring集成kafka的消费者配置方式如下(部分属性配置通过properties解耦,用户使用时可以自定义):
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
             http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <!-- 定义consumer的参数 -->
        <bean id="consumerProperties" class="java.util.HashMap">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
                    <entry key="retries" value="${kafka.reties}" />
                    <entry key="retry.backoff.ms" value="${kafka.retry.backoff.ms}" />
                    <!-- 是否自动提交 -->
                    <entry key="enable.auto.commit" value="${kafka.enable.auto.commit}"/>
                    <!-- 自动提交的间隔时间 -->
                    <entry key="auto.commit.interval.ms" value="${kafka.auto.commit.interval.ms}"/>
                    <!-- 重启后是否从最新的offset地址消费 -->
                    <entry key="auto.offset.reset" value="${kafka.auto.offset.reset}"/>
                    <entry key="key.deserializer"
                           value="org.apache.kafka.common.serialization.StringDeserializer" />
                    <entry key="value.deserializer"
                           value="org.apache.kafka.common.serialization.StringDeserializer" />
                </map>
            </constructor-arg>
        </bean>
    
        <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg ref="consumerProperties"/>
        </bean>
    
        <bean id="containerProperties_openLevel3Account" class="org.springframework.kafka.listener.config.ContainerProperties">
            // 消费者消费的topic名称
            <constructor-arg value="${kafka.topic.name}"/>
            // 以开户为例, 消息由OpenAccountKafkaListener处理
            <property name="messageListener" ref="openAccountKafkaListener"/>
            // 即表示ConsumerGroup的groupId
            <property name="groupId" value="${kafka.group.id}"/>
        </bean>
    
        <bean id="messageListenerContainer_openAccount" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
            <constructor-arg ref="consumerFactory"/>
            <constructor-arg ref="containerProperties_openAccount"/>
        </bean>
    </beans>
    
    • 消息处理
      由上面的配置可以,${kafka.topic.name}指定的topic,其消息由OpenAccountKafkaListener处理,OpenAccountKafkaListener的核心源码如下:
    /**
     * 钱包开户后送积分
     * @author afei
     * @version 1.0.0
     * @since 2018年06月26日
     */
    @Component
    public class OpenAccountKafkaListener implements MessageListener<String, String> {
    
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
    
        @Override
        public void onMessage(ConsumerRecord<String, String> record) {
            logger.debug("msg: {}", record.value());
            // 拿到消息后, 反序列化为OpenAccount
            OpenAccount mqInput = JSON.parseObject(record.value(), OpenAccount.class);
            
            //TODO 拿到开户信息后, 可以送积分, 送优惠券等
        }
    }
    

    显而易见,spring集成kafka后,消费端的简单的很多。另外,我们在没用使用spring集成kafka时可以拿到kafak消费者异步提交,也可以同步提交,但是集成spring后,如何实现呢?客官老爷们稍安勿躁,继续往下看。

    上面的开发消息监听器OpenAccountKafkaListener实现了接口org.springframework.kafka.listener.MessageListener,有另外一个接口org.springframework.kafka.listener.ConsumerAwareMessageListener实现了这个接口,这个接口源码如下:

    @FunctionalInterface
    public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
    
        // 看这个方法的定义,增加了"default",即我们的业务类如果实现这个这个接口,就不需要实现这个接口,而只需要实现下面的接口即可,下面的接口有Consumer,我们就能主动执行同步提交或者异步提交了
        @Override
        default void onMessage(ConsumerRecord<K, V> data) {
            throw new UnsupportedOperationException("Container should never call this");
        }
    
        @Override
        void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
    
    }
    
    • 消息处理(第二版)
    @Component
    public class OpenAccountKafkaListener  implements ConsumerAwareMessageListener<String, String> {
    
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
    
        @Override
        public void onMessage(ConsumerRecord<String, String> record, Consumer<?, ?> consumer) {
            logger.info("Receive msg with consumer: {}", record.value());
            OpenAccount mqInput = JSON.parseObject(record.value(), OpenAccount.class);
    
            try {
                //TODO 拿到开户信息后, 可以送积分, 送优惠券等
            } finally {
                // 消息处理完后异步提交
                consumer.commitAsync((offsets, exception) -> {
                    if (exception==null){
                        // offsets需要用gson序列化输出
                        logger.info("The offset info of commit async: {}", GsonUtils.format(offsets));
                    }else{
                        logger.error("Commit async failed. ", exception);
                    }
                });
            }
        }
    }
    

    深入发送消息

    前面已经介绍了如何使用kafka生产者发送消息,以及如何用消费者接收消息,包括原生方式和spring集成方式,接下来我们跟踪源码看看消息在调用KafkaProducer中的send()后发送到kafka broker之前需要经过哪些处理。

    • 拦截器
      无论是同步调用send(),还是异步调用send()发送消息,最终都是调用下面的方法:
    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }
    

    由这段代码可知,消息发送前第一步就是调用拦截器(如果有的话),拦截器可以对消息进行加工。后面会单独有一篇文章详细的分析拦截器。

    接下来调用doSend()方法,源码如下:

    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            // first make sure the metadata for the topic is available
            // 得到集群信息和已经消耗的时间
            ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            // 根据参数max.block.ms和已经消息的时间差,得到剩余时间
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            // 集群信息本地变量化
            Cluster cluster = clusterAndWaitTime.cluster;
            
            // 序列化key
            byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            // 序列化value
            byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            
            // 选择分区
            int partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
    
            setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();
            
            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                    compressionType, serializedKey, serializedValue, headers);
            // 发送的消息size不允许超过max.request.size和buffer.memory两个参数的值
            ensureValidRecordSize(serializedSize);
            // 得到消息发送时间,默认为当前时间,除非构造ProducerRecord时指定了timestamp
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // producer callback will make sure to call both 'callback' and interceptor callback
            Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
    
            if (transactionManager != null && transactionManager.isTransactional())
                transactionManager.maybeAddPartitionToTransaction(tp);
    
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        } catch (Exception e) {
            this.errors.record();
            // we notify interceptor about all exceptions, since onSend is called before anything else in this method
            // 如果发送消息时有异常,那么调用所有拦截器上的onAcknowledgement()方法,所以通过拦截器种onAcknowledgement()方法的exception是否为空,判断消息是否发送成功,从而可以统计发送成功率
            this.interceptors.onSendError(record, tp, e);
            throw e;
        }
    }
    
    • 获取集群信息
      由doSend()方法源码可知,获取集群信息的源码就在waitOnMetadata()中,其源码如下:
    private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
        // add topic to metadata topic list if it is not there already and reset expiry
        metadata.add(topic);
        // 第一次获取集群信息,初始化的集群信息为bootstrap.servers参数指定的集群信息
        Cluster cluster = metadata.fetch();
        // 从缓存的主题&分区信息map(Map<String, List<PartitionInfo>>)中获取分区总数
        Integer partitionsCount = cluster.partitionCountForTopic(topic);
        if (partitionsCount != null && (partition == null || partition < partitionsCount))
            // 如果已经缓存了,那么直接返回,且因为没有花时间在获取集群信息上,所以构造方法的第二个参数为0
            return new ClusterAndWaitTime(cluster, 0);
        // 记下开始时间(为了计算获取集群信息消耗的时间)
        long begin = time.milliseconds();
        // 初始化剩余时间就是max.block.ms参数指定的时间(即获取集群信息最大允许阻塞时间,官方文档指KafkaProducer.send() and KafkaProducer.partitionsFor()两步的时间差)
        long remainingWaitMs = maxWaitMs;
        long elapsed;
        
        do {
            log.trace("Requesting metadata update for topic {}.", topic);
            metadata.add(topic);
            // needUpdate置为true,并返回版本
            int version = metadata.requestUpdate();
            sender.wakeup();
            try {
                // 等待元数据信息更新,直到当前版本号超过上一次版本号version。另外,这个更新过程不能耗时不允许超过remainingWaitMs
                metadata.awaitUpdate(version, remainingWaitMs);
            } catch (TimeoutException ex) {
                // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            }
            // 更新后,再次获取集群信息
            cluster = metadata.fetch();
            // 计算此次更新过程耗时
            elapsed = time.milliseconds() - begin;
            // 如果耗时超过了max.block.ms参数指定的时间,那么抛出异常
            if (elapsed >= maxWaitMs)
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            if (cluster.unauthorizedTopics().contains(topic))
                throw new TopicAuthorizationException(topic);
            // 根据刚才计算的此次更新消耗的时间,计算剩余时间
            remainingWaitMs = maxWaitMs - elapsed;
            // 得到这个topic的分区数
            partitionsCount = cluster.partitionCountForTopic(topic);
            // 如果这个topic分区数获取失败,那么继续获取,直到耗尽max.block.ms指定的时间
        } while (partitionsCount == null);
    
        // 如果构造ProducerRecord时指定了分区,且指定的值大于或等于分区数,那么抛出异常(例如,名为"ORDER-DETAIL"的topic,有7个分区,如果构造ProducerRecord时指定了partition的值且为7或者大于7,那么就会抛出这个异常,异常信息为:Invalid partition given with record: 7 is not in the range [0...7).)
        if (partition != null && partition >= partitionsCount) {
            throw new KafkaException(
                    String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
        }
    
        // 返回集群信息和剩余时间
        return new ClusterAndWaitTime(cluster, elapsed);
    }
    

    通过这段源码分析可知,当我们构造KafkaProducer时指定的bootstrap.servers的值,不一定要和kafka集群信息完全一致,kafka-client可以通过参数bootstrap.servers指定的broker,然后从broker上获取到整个kafka集群元数据信息。但是即使是这样,参数bootstrap.servers也建议尽量完整。例如整个集群有3个broker,如果bootstrap.servers只指定了1个broker,那么当这个broker宕机后,虽然集群状态可用。但是

    • 序列化
      即经过拦截器链后另一个非常重要的操作:对key&value的序列化。核心代码是如下两行,对key的序列化,调用的方法是构造KafkaProducer时参数key.serializer指定的serializer,对value的序列化,调用的方法是构造KafkaProducer时参数value.serializer指定的serializer:
    keySerializer.serialize(record.topic(), record.headers(), record.key());
    valueSerializer.serialize(record.topic(), record.headers(), record.value());
    
    • 分区
      接下来就是选择分区,核心代码如下:
    int partition = partition(record, serializedKey, serializedValue, cluster);
    
    • 总结
      根据上面的分析可知,消息发送经过的几个重要过程按照先后顺序依次是:拦截器,获取元数据,序列化,选择分区。接下来的文章会一一详细分析这些必要重要的过程。

    相关文章

      网友评论

        本文标题:4. kafka生产者&消费者

        本文链接:https://www.haomeiwen.com/subject/vidnyftx.html