美文网首页
kafka服务端0.10.2,客户端2.0 如何发送消息

kafka服务端0.10.2,客户端2.0 如何发送消息

作者: 味道_3a01 | 来源:发表于2021-06-10 10:32 被阅读0次

    项目中用到kafka,应用作为生产者,发送消息时,报错:

    Magic v1 does not support record headers
    

    网上也有很多同样的博客记录这个错误,如:https://zhuanlan.zhihu.com/p/205676507

    原因是 Kafka服务端和客户端的版本不兼容导致了报错;

    应用环境:spring boot 版本:2.1.6.RELEASE

    查看客户端kafka版本:2.2.7.RELEASE


    spring-kafka-2.7.0.png

    服务端版本:0.10.2


    kafka-server.png

    到这里确定是版本不一致导致的,如何解决呢?

    方案一:

    服务端升级到2.0

    由于使用的是 阿里云的消息队列kafka,2.0费用比当前版本贵很多,故该方案pass

    方案二:

    客户端版本使用1.0的

    使用原生的kafka,即自己写kafka configuration,由于我们还要消费kafka的消息,故还需要写一个监听器,用来消费消息,比较麻烦;换个思路:消费消息仍然使用 spring-kafka,发送消息使用kafka原生API

    1. maven依赖
    <!--kafka-->
    <dependency>
         <groupId>org.springframework.kafka</groupId>
         <artifactId>spring-kafka</artifactId>
    </dependency>
    

    没有version,查看maven依赖,发现spring boot 帮我们管理了spring-kafka的版本


    spring-kafka.png

    Ctrl + f 查找 spring-kafka.version,如下图:


    spring-kafka-2.png
    1. KafkaConfig配置类
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.List;
    import java.util.Properties;
    
    /**
     * kafka 配置
     *   由于kafka服务端使用的是 0.10.2版本,而我们的框架使用的spring boot版本为2.4.5,
     *   kafka starter版本中kafka-clients版本为2.6.0;kafka客户端作为生产者发送消息
     *   时,会增加请求头,而服务端不支持,导致无法正常生产消息,故kafka作为生产者,不能直
     *   接使用boot starter,需使用kafka原生客户端
     * @author guodong.zhang
     */
    @Configuration
    public class KafkaConfig {
    
        @Value("#{'${spring.kafka.consumer.bootstrap-servers}'.split(',')}")
        private List<String> bootstrapServers;
    
        @Bean
        public KafkaProducer<String, String> initKafkaTemplate() {
            Properties props = new Properties();
            //设置接入点,请通过控制台获取对应Topic的接入点
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    
            //Kafka消息的序列化方式
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            //请求的最长等待时间
            props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
            //设置客户端内部重试次数
            props.put(ProducerConfig.RETRIES_CONFIG, 5);
            //设置客户端内部重试间隔
            props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
            //构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可;
            //如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个
            KafkaProducer<String, String> producer = new KafkaProducer<>(props);
            return producer;
        }
    }
    
    1. 生产消息类
    
    import com.alibaba.fastjson.JSON;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang.StringUtils;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.util.concurrent.Future;
    
    /**
     * 生产者,推送消息至kafka
     *
     * @author guodong.zhang
     */
    @Slf4j
    @Component
    public class MessageSenderClient {
    
    
        @Resource
        private KafkaProducer kafkaProducer;
    
        /**
         * 消息发送至 kafka
         *
         * @param topic 路由
         * @param data 消息内容
         */
        public void send(String topic, Object data) {
            if (StringUtils.isEmpty(topic) || data == null) {
                throw new IllegalStateException("发送消息参数不能为空");
            }
            try {
                ProducerRecord<String, String> kafkaMessage =  new ProducerRecord<>(topic, JSON.toJSONString(data));
                Future<RecordMetadata> metadataFuture = kafkaProducer.send(kafkaMessage);
                RecordMetadata recordMetadata = metadataFuture.get();
                log.info("MessageSenderClient kafka send Produce ok:{}", recordMetadata.toString());
            } catch (Exception e) {
                log.info("MessageSenderClient kafka send error,", e);
            }
        }
    
        /**
         * 消息发送至 kafka
         *
         * @param topic
         * @param partition 分区
         * @param data
         */
        public void send(String topic, Integer partition, Object data) {
            if (StringUtils.isEmpty(topic) || data == null) {
                throw new IllegalStateException("发送消息参数不能为空");
            }
            if (partition == null) {
                send(topic, data);
            }
            try {
                ProducerRecord<String, String> kafkaMessage =  new ProducerRecord<>(topic, partition, null, JSON.toJSONString(data));
                Future<RecordMetadata> metadataFuture = kafkaProducer.send(kafkaMessage);
                RecordMetadata recordMetadata = metadataFuture.get();
                log.info("MessageSenderClient kafka send Produce ok:{}", recordMetadata.toString());
            } catch (Exception e) {
                log.info("MessageSenderClient kafka send error,", e);
            }
        }
    }
    
    1. 需要发送消息的地方调用该方法
    
        @Value("${kafka.producer.job_collection_es_topic}")
        private String topic;
    
        @Autowired
        private MessageSenderClient kafkaProducer;
    
        private void sendMsg(Object target) {
            if (target == null || ((JSONObject) target).get(BIZ_ID) == null) {
                throw new BusinessException("发送消息参数不能为空");
            }
            String bizId = ((JSONObject) target).get(BIZ_ID).toString();
            int partition = Integer.parseInt(bizId) % PARTITION_NUM;
            // 推送到kafka
            kafkaProducer.send(topic, partition, target);
        }
    
    1. 测试验证


      msg.png

      疑问:

    kafka客户端与服务端版本需要对应,而我这里kafka-clients使用的版本是 2.0.1,却可以正常发送消息


    kafka-client-2.0.1.jpg

    相关文章

      网友评论

          本文标题:kafka服务端0.10.2,客户端2.0 如何发送消息

          本文链接:https://www.haomeiwen.com/subject/tyzosltx.html