美文网首页
Springboot整合kafka的使用

Springboot整合kafka的使用

作者: wayne_wzy | 来源:发表于2018-05-25 09:41 被阅读0次

    本文章针对自己在使用kafka和springboot过程上做一些记录。

    1 .kafka介绍

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)
    是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。
    

    2 .整合过程

    2.1 pom依赖(和SpringBoot整合过程中添加此依赖即可)
      <dependency>
          <groupId>org.springframework.kafka</groupId>
          <artifactId>spring-kafka</artifactId>
      </dependency>
    
    2.2 kafka相关配置
     在使用kafka过程中,尽量不要使用它自带的zookeper,因为有可能访问不通。项目中我使用的是yml配置文件,其配置如下:
    
    #kafka相关配置
      kafka:
        #kafka服务器地址
        bootstrap-servers: 127.0.0.1:9092  
        #key-value序列化
        consumer:
          group-id: consumer-group
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
    2.3 kafka使用详解
    /**
      * kafka配置类
      */
    @Configuration
    public class KafkaConfig {
    
        @Bean
        public ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<String,String>(producerConfigs());
        }
    
        @Bean
        public Map<String, Object> producerConfigs() {
            HashMap<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<String, String>(producerFactory());
        }
    
    }
    
    /**
     * @author Wayne
     * @date 2018/5/25
     * <p>
     * desc: kafka消息生产端
     */
    @Component
    public class KafkaProducer  {
    
        @Autowired
        private KafkaTemplate<String,String> kafkaTemplate;
    
        private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
    
        public void sendMessage(String topic, String message) {
            // the KafkaTemplate provides asynchronous send methods returning a
            // Future
            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
    
            // you can register a callback with the listener to receive the result
            // success回调
            future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
    
                @Override
                public void onSuccess(SendResult<String, String> result) {
                    ProducerRecord<String, String> producerRecord = result.getProducerRecord();
                    String key = producerRecord.key();
                    String value = producerRecord.value();
                    RecordMetadata recordMetadata = result.getRecordMetadata();
                    String topic = recordMetadata.topic();
                    logger.info("send kafka message='{}' with offset={}", message, result.getRecordMetadata().offset());
                    logger.info("key = '{}'" , key);
                    logger.info("value = '{}'",value);
                    logger.info("topic = '{}'",topic);
                }
                
                @Override
                public void onFailure(Throwable ex) {
                    logger.debug("unable to send kafka message='{}'", message, ex);
                }
            });
        }
    }
    

    测试使用 :

        @Autowired
        private KafkaProducer kafkaProducer;
    
        @GetMapping(value = "/kafka")
        public Response kafkaTest() {
            KafkaMessage kafkaMessage = new KafkaMessage(KafkaMessageType.KAFKA_MESSAGE_TEST, "kafka测试");
            kafkaProducer.sendMessage(KafkaTopicConstant.USER_KAFKA_TOPIC, new Gson().toJson(kafkaMessage));
            return HttpUtil.ResponseSuccess();
        }
    

    Created by wayne 2018/5/25

    相关文章

      网友评论

          本文标题:Springboot整合kafka的使用

          本文链接:https://www.haomeiwen.com/subject/pahyjftx.html