美文网首页
Spring-boot整合Kafka

Spring-boot整合Kafka

作者: 枯木风 | 来源:发表于2018-12-02 23:17 被阅读0次

    生产者

    说明

    KafkaTemplate封装了一个生成器,并提供了方便的方法来发送数据到kafka主题。 提供了异步和同步方法,异步方法返回一个Future。

    其构造方法有:

        ListenableFuture<SendResult<K, V>> sendDefault(V data);
    
        ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
    
        ListenableFuture<SendResult<K, V>> sendDefault(int partition, K key, V data);
    
        ListenableFuture<SendResult<K, V>> send(String topic, V data);
    
        ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
    
        ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data);
    
        ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V data);
    
        ListenableFuture<SendResult<K, V>> send(Message<?> message);
    

    前3个方法需要向Temple提供默认主题

    配置

    使用Producer配置类

    @Configuration
    @EnableKafka
    public class ProducerConfig {
    
        @Value("${kafka.producer.servers}")
        private String servers;
    
        @Value("${kafka.producer.retries}")
        private int retries;
    
        @Value("${kafka.producer.batch.size}")
        private int batchSize;
    
        @Value("${kafka.producer.linger}")
        private int linger;
    
        @Value("${kafka.producer.buffer.memory}")
        private int bufferMemory;
    
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(Config.BOOTSTRAP_SERVERS_CONFIG, servers);
            props.put(Config.RETRIES_CONFIG, retries);
            props.put(Config.BATCH_SIZE_CONFIG, batchSize);
            props.put(Config.LINGER_MS_CONFIG, linger);
            props.put(Config.BUFFER_MEMORY_CONFIG, bufferMemory);
            props.put(Config.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(Config.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(Config.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
        public ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<String, String>(producerFactory());
        }
    }
    

    示例

    @RestController
    @RequestMapping("/kafka/producer")
    public class ProducerController {
        private static Logger logger = LoggerFactory.getLogger(ProducerController.class);
    
        @Value("${topic.name}")
        private String topicName;
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @RequestMapping("/send")
        public Object sendKafka(String message) {
            try {
                logger.info("send kafka message: {}", message);
                kafkaTemplate.send(topicName, UUID.randomUUID().toString(), message);
                return "success";
            } catch (Exception e) {
                logger.error("发送kafka失败", e);
                return "fail";
            }
        }
    }
    

    消费者

    说明

    可以通过配置MessageListenerContainer并提供MessageListener或通过使用@KafkaListener注释来接收消息。
    MessageListenerContainer有两个实现:

    • KafkaMessageListenerContainer:从单个线程上的所有主题/分区接收所有消息
    • ConcurrentMessageListenerContainer:委托给1个或多个KafkaMessageListenerContainer以提供多线程消费。通过container.setConcurrency(3),来设置多个线程

    配置

    使用Consumer配置类

    @Configuration
    @EnableKafka
    public class ConsumerConfig {
    
        @Value("${kafka.consumer.servers}")
        private String servers;
    
        @Value("${kafka.consumer.enable.auto.commit}")
        private boolean enableAutoCommit;
    
        @Value("${kafka.consumer.session.timeout}")
        private String sessionTimeout;
    
        @Value("${kafka.consumer.auto.commit.interval}")
        private String autoCommitInterval;
    
        @Value("${kafka.consumer.group.id}")
        private String groupId;
    
        @Value("${kafka.consumer.topic}")
        private String topic;
    
        @Value("${kafka.consumer.auto.offset.reset}")
        private String autoOffsetReset;
    
        @Value("${kafka.consumer.concurrency}")
        private int concurrency;
    
        /**
         * KafkaMessageListenerContainer: 从单个线程上的所有主题/分区接收所有消息
    
        @Bean(initMethod = "doStart")
        public KafkaMessageListenerContainer<String, String> kafkaMessageListenerContainer() {
            KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory(), containerProperties());
            return container;
        }
    
        */
    
        /**
         * ConcurrentMessageListenerContainer:
         * 委托给1个或多个KafkaMessageListenerContainer以提供多线程消费。
         * 通过container.setConcurrency(3),来设置多个线程
         */
        @Bean(initMethod = "doStart")
        public ConcurrentMessageListenerContainer<String, String> concurrentMessageListenerContainer() {
            ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory(), containerProperties());
            container.setConcurrency(concurrency);
            return container;
    
        }
    
        public ConsumerFactory<String, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        public ContainerProperties containerProperties() {
            ContainerProperties containerProperties = new ContainerProperties(topic);
            containerProperties.setMessageListener(messageListener());
            return containerProperties;
        }
    
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> propsMap = new HashMap<>();
            propsMap.put(Config.BOOTSTRAP_SERVERS_CONFIG, servers);
            propsMap.put(Config.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            propsMap.put(Config.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
            propsMap.put(Config.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
            propsMap.put(Config.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(Config.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(Config.GROUP_ID_CONFIG, groupId);
            propsMap.put(Config.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            return propsMap;
        }
    
        public MessageListener<String, String> messageListener() {
            return new CustomMessageListener();
        }
    }
    

    消息接收

    Java实现

    直接使用kafka0.10 client去收发消息

    @Test
    public void receive(){
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        try{
            consumer.subscribe(Arrays.asList(topic));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(10000);
                records.forEach(record -> {
                    System.out.printf("client : %s , topic: %s , partition: %d , offset = %d, key = %s, value = %s%n", clientId, record.topic(),
                            record.partition(), record.offset(), record.key(), record.value());
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            consumer.close();
        }
    }
    
    使用MessageListener接口

    继承MessageListener接口

    public class CustomMessageListener implements MessageListener<Integer, String> {
        private static Logger logger = LoggerFactory.getLogger(CustomMessageListener.class);
    
        @Override
        public void onMessage(ConsumerRecord<Integer, String> data) {
            logger.info("received key: {}, value: {}", data.key(), data.value());
        }
    
      //或包含消费者的onMessage方法,以手动提交ofset
    }
    
    使用@KafkaListener注解
    @KafkaListener(id = "foo", topics = "myTopic")
    public void listen(String data) {
         ...
    }
    
    @KafkaListener(id = "bar", topicPartitions =
            { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
              @TopicPartition(topic = "topic2", partitions = "0",
                 partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
            })
    public void listen(ConsumerRecord<?, ?> record) {
        ...
    }
    

    总结

    • 对于生产者来说,封装KafkaProducer到KafkaTemplate相对简单
    • 对于消费者来说,由于spring是采用注解的形式去标注消息处理方法
      1. 先在KafkaListenerAnnotationBeanPostProcessor中扫描bean,然后注册到KafkaListenerEndpointRegistrar
      2. 而KafkaListenerEndpointRegistrar在afterPropertiesSet的时候去创建MessageListenerContainer
      3. messageListener包含了原始endpoint携带的bean以及method转换成的InvocableHandlerMethod
      4. ConcurrentMessageListenerContainer这个衔接上,根据配置的spring.kafka.listener.concurrency来生成多个并发的KafkaMessageListenerContainer实例
      5. 每个KafkaMessageListenerContainer都自己创建一个ListenerConsumer,然后自己创建一个独立的kafka consumer,每个ListenerConsumer在线程池里头运行,这样来实现并发
      6. 每个ListenerConsumer里头都有一个recordsToProcess队列,从原始的kafka consumer poll出来的记录会放到这个队列里头,
      7. 然后有一个ListenerInvoker线程循环超时等待从recordsToProcess取出记录,然后调用messageListener的onMessage方法(即KafkaListener注解标准的方法)
    项目源码

    https://github.com/scjqwe/spring-kafka-examples

    参考

    相关文章

      网友评论

          本文标题:Spring-boot整合Kafka

          本文链接:https://www.haomeiwen.com/subject/gxxecqtx.html