美文网首页
Spring Boot开发 之 Kafka

Spring Boot开发 之 Kafka

作者: 诺之林 | 来源:发表于2018-11-05 18:56 被阅读16次

    本文的示例代码参考KafkaBasic

    目录

    Kafka

    搭建

    docker run --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime -d zookeeper
    
    docker run --name kafka -p 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=192.168.1.76:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.1.76 --env KAFKA_ADVERTISED_PORT=9092 -v /etc/localtime:/etc/localtime -d wurstmeister/kafka
    

    测试

    docker exec -it kafka /bin/bash
    
    cd /opt/kafka_2.11-2.0.0/
    
    # 创建topic
    bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test
    
    bin/kafka-topics.sh --list --zookeeper zookeeper:2181
    
    # 消息生产者
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    # hello kafka
    
    docker exec -it kafka /bin/bash
    
    cd /opt/kafka_2.11-2.0.0/
    
    # 消息消费者
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    # hello kafka
    

    Spring

    spring init -b 1.5.6.RELEASE -dweb,kafka --build gradle KafkaBasic && cd KafkaBasic
    
    vim ./src/main/resources/application.properties
    
    spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
    spring.kafka.consumer.group-id=test
    

    Model

    vim ./src/main/java/com/example/KafkaBasic/Message.java
    
    package com.example.KafkaBasic;
    
    import java.util.Date;
    
    public class Message {
        private Long id;
    
        private String msg;
    
        private Date sendTime;
    
        public Long getId() {
            return id;
        }
    
        public void setId(Long id) {
            this.id = id;
        }
    
        public String getMsg() {
            return msg;
        }
    
        public void setMsg(String msg) {
            this.msg = msg;
        }
    
        public Date getSendTime() {
            return sendTime;
        }
    
        public void setSendTime(Date sendTime) {
            this.sendTime = sendTime;
        }
    }
    

    Sender

    vim ./src/main/java/com/example/KafkaBasic/Sender.java
    
    package com.example.KafkaBasic;
    
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    import java.util.UUID;
    
    @Component
    public class Sender {
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        public String send() throws JsonProcessingException {
            Message message = new Message();
            message.setId(System.currentTimeMillis());
            message.setMsg(UUID.randomUUID().toString());
            message.setSendTime(new Date());
    
            ObjectMapper objectMapper = new ObjectMapper();
            String json = objectMapper.writeValueAsString(message);
    
            kafkaTemplate.send("test", json);
            return json;
        }
    }
    
    vim ./src/main/java/com/example/KafkaBasic/SenderConfig.java
    
    package com.example.KafkaBasic;
    
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class SenderConfig {
    
        @Value("${spring.kafka.consumer.bootstrap-servers}")
        private String bootstrapServers;
    
        @Bean
        public Map<String, Object> producerConfigs() {
            Map<String,Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.ACKS_CONFIG, "0");
            return props;
        }
    
        @Bean
        public ProducerFactory<String, String> producerFactory() {
            return  new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<String, String>(producerFactory());
        }
    
        @Bean
        public Sender sender() {
            return new Sender();
        }
    }
    

    Receiver

    vim ./src/main/java/com/example/KafkaBasic/Receiver.java
    
    package com.example.KafkaBasic;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Optional;
    
    @Component
    public class Receiver {
    
        @KafkaListener(topics = {"test"})
        public void listen(ConsumerRecord<?, ?> record) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            if (kafkaMessage.isPresent()) {
                Object message = kafkaMessage.get();
                System.out.println(message);
            }
        }
    }
    

    Controller

    vim ./src/main/java/com/example/KafkaBasic/TestsController.java
    
    package com.example.KafkaBasic;
    
    import com.fasterxml.jackson.core.JsonProcessingException;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @RequestMapping("/tests")
    public class TestsController {
    
        @Autowired
        private Sender sender;
    
        @GetMapping
        public String index() throws JsonProcessingException {
            return sender.send();
        }
    }
    
    • 测试
    ./gradlew bootrun
    
    curl localhost:8080/tests | json
    
    {
      "id": 1541415087205,
      "msg": "3068cc02-8122-459d-b8cf-02ac240ae573",
      "sendTime": 1541415087205
    }
    
    终端打印:
    {"id":1541415087205,"msg":"3068cc02-8122-459d-b8cf-02ac240ae573","sendTime":1541415087205}
    

    参考

    相关文章

      网友评论

          本文标题:Spring Boot开发 之 Kafka

          本文链接:https://www.haomeiwen.com/subject/wiesxqtx.html