美文网首页
kafka 优雅实践

kafka 优雅实践

作者: wyh1791 | 来源:发表于2019-10-11 20:31 被阅读0次

    本文分别从生产端和消费端分别说明

    1.生产端优化

    生产端通过如下提高并发和可靠性

    • 设置大缓冲区100M

    • 缓冲区延迟1s发送

    • 缓冲区最大批量200000

    • 发送端消息进行压缩

    • 失败重试1000次

    2.消费端优化(针对图片处理)

    消费端通过如下方式进行优化

    • 消息手动确认, 提高可靠性, 避免消息丢失

    • 一次只拉去一个消息, 因为图片处理慢, 所以一次只拉去一个消息

    • 拉取一次最大处理时间30min(保证图片处理时间充足)

    • 避免reblance 超时时间5min, 心跳间隔3s, 超过5min没有心跳才reblance

    • 任何情况必须ack消息

    • 并发控制为5

    • kafka用户名&密码配置化, 去除秘钥文件

    • toptic 统一配置

    
    #kafka基础配置,不要变动
    
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    
    spring.kafka.producer.acks=1
    
    spring.kafka.producer.retries=1000
    
    spring.kafka.producer.compression-type=gzip
    
    spring.kafka.producer.properties.linger.ms=1000
    
    spring.kafka.producer.properties.batch.size=200000
    
    spring.kafka.producer.properties.max.block.ms=600000
    
    spring.kafka.producer.properties.buffer.memory=100554432
    
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    
    spring.kafka.consumer.enable-auto-commit=false
    
    spring.kafka.consumer.max-poll-records=1
    
    spring.kafka.consumer.auto-offset-reset=earliest
    
    spring.kafka.consumer.properties.max.poll.interval.ms=1800000
    
    spring.kafka.consumer.properties.rebalance.timeout.ms=300000
    
    spring.kafka.consumer.properties.session.timeout.ms=300000
    
    spring.kafka.consumer.properties.heartbeat.interval.ms=3000
    
    spring.kafka.properties.security.protocol=SASL_PLAINTEXT
    
    spring.kafka.properties.sasl.mechanism=PLAIN
    
    spring.kafka.properties.request.timeout.ms=301000
    
    spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
    
    spring.kafka.listener.concurrency=5
    
    
    
    #kafka账号
    
    spring.kafka.bootstrap-servers=172.16.97.161:2093
    
    spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="用户名" password="密码";
    
    #kafka topic
    
    spring.topics.imageTransformMessage.topic=image_transform_message_prod
    
    spring.topics.imageTransformMessage.group=image_transform_message_prod
    
    spring.topics.imageTransformResult.topic=image_transform_result_prod
    
    spring.topics.imageTransformResult.group=image_transform_result_prod
    
    

    3.代码实例

    3.1发送端代码

    
    @Slf4j
    
    @Component
    
    public class KafkaProducer {
    
        @Autowired
    
        private KafkaTemplate kafkaTemplate;
    
        @Value("${spring.topics.imageTransformResult.topic}")
    
        private String topic;
    
        /**
    
        * 相同的key发到同一个partition
    
        * <p>
    
        * 由于kafka是根据key的hash值取模去分的partition 导致肯能分布不均,所以此处随机去partition的值
    
        * @param key
    
        * @param data
    
        * @param <T>
    
        * @return
    
        */
    
        public <T> boolean sendMessage(String key, T data) {
    
            String jsonData = JSONObject.toJSONString(data);
    
            UUID uuid = UUID.randomUUID();
    
            String suuid = StringUtils.remove(uuid.toString(), "-");
    
            try {
    
                int partitionSize = kafkaTemplate.partitionsFor(topic).size();
    
                int randomPartition = (int) (System.currentTimeMillis() % partitionSize);
    
                Header header = new RecordHeader("UUID", suuid.getBytes());
    
                ProducerRecord producerRecord = new ProducerRecord(topic, randomPartition, key, jsonData, Arrays.asList(header));
    
                log.info("begin send key {}, data {}, uuid {}", key, data, suuid);
    
                kafkaTemplate.send(producerRecord);
    
                log.info("after send uuid {}", suuid);
    
                return true;
    
            } catch (Exception e) {
    
                log.error("sendMessage error, suuid {}, key {}, data {}", suuid, key, jsonData, e);
    
                String message = "商品同步kafka消息发送失败:" + suuid;
    
                return false;
    
            }
    
        }
    
    }
    
    

    3.2消费端代码

    
    @Slf4j
    
    @Component
    
    public class KafkaConsumer {
    
        @Autowired
    
        private ImageBiz imageBiz;
    
        @Autowired
    
        private KafkaProducer kafkaProducer;
    
        @KafkaListener(topics = "#{'${spring.topics.imageTransformMessage.topic}'}", groupId = "#{'${spring.topics.imageTransformMessage.group}'}")
    
        public void processMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
    
            Stopwatch stopwatch = Stopwatch.createStarted();
    
            try {
    
                String key = record.key();
    
                String data = record.value();
    
                log.info("kafka receive message, key {}, data {}", key, data);
    
                if (BaseUtil.isEmpty(data)) {
    
                    acknowledgment.acknowledge();
    
                    return;
    
                }
    
                acknowledgment.acknowledge();
    
            } catch (Exception e) {
    
                String suuid = MDC.get("UUID") == null ? "" : MDC.get("UUID");
    
                log.error("消费消息异常,请线上查找原因: {}", suuid, e);
    
            }
    
        }
    
    }
    
    

    4.消费端场景

    kafka消费端主要有两种场景

    1. 消息数量不多, 但是处理每一消息的时间比较长

    2. 消息数量很多, 处理每一个消息的时间很短

    场景1

    和上面介绍的图片处理类似, 每次拉去少量消息, 给消息处理留足够时间

    场景2

    场景2可以转化为场景1, 把原来的1000个消息组织为一个消息, 批量处理

    如果发送方很分散, 并且只能一个个的发消息, 可以使用批量监听消息

    配置修改

    
    #一次拉取1000消息
    
    spring.kafka.consumer.max-poll-records=1000
    
    #批量消费模式
    
    spring.kafka.listener.type=batch
    
    

    消费端代码

    
    @Slf4j
    
    @Component
    
    public class KafkaConsumer {
    
        @Autowired
    
        private ImageBiz imageBiz;
    
        @Autowired
    
        private KafkaProducer kafkaProducer;
    
        @KafkaListener(topics = "#{'${spring.topics.imageTransformMessage.topic}'}", groupId = "#{'${spring.topics.imageTransformMessage.group}'}")
    
        public void processMessage(List<ConsumerRecord<String, String>> record, Acknowledgment acknowledgment) {
    
            Stopwatch stopwatch = Stopwatch.createStarted();
    
            try {
    
                acknowledgment.acknowledge();
    
            } catch (Exception e) {
    
                String suuid = MDC.get("UUID") == null ? "" : MDC.get("UUID");
    
                log.error("消费消息异常,请线上查找原因: {}", suuid, e);
    
            }
    
        }
    
    }
    
    

    参考:

    相关文章

      网友评论

          本文标题:kafka 优雅实践

          本文链接:https://www.haomeiwen.com/subject/lwixmctx.html