美文网首页
Docker 安装kafka 并集成到spring boot

Docker 安装kafka 并集成到spring boot

作者: 倾国倾城林二狗 | 来源:发表于2020-04-11 12:52 被阅读0次

    #1. 安装kafka前需要安装zookeeper,kafka是需要zookeeper管理的

    docker pull wurstmeister/zookeeper

    提示是否安装最后版本,Y

    #2. 启动zookeeper容器

    docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

    #3.安装kafka

    docker pull wurstmeister/kafka

    #4.启动kafka

    docker run -d --name kafka -p 9092:9092  --env KAFKA_ADVERTISED_HOST_NAME=localhost -e KAFKA_ZOOKEEPER_CONNECT=218.25.54.37:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://218.25.54.37:9092 -e KAFKA_LISTENERS=PLAINTEXT://218.25.54.37:9092 -e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" --net=host wurstmeister/kafka 

    #5 进入到容器中

    docker exec -it kafka bash

    #6 进入bin文件夹

    #7 创建topic 生产者

    ./kafka-topics.sh --create --zookeeper 218.25.54.37:2181 --replication-factor 1 --partitions 8 --topic test

    ./kafka-console-producer.sh --broker-list 218.25.54.37:9092 --topic test

    #8 打开新的窗口,进入容器,进入bin文件夹 创建消费者

    ./kafka-console-consumer.sh --bootstrap-server 218.25.54.37:9092 --topic test --from-beginning

    在生产者窗口输入消息,在消费者窗口就可以查看了。

    生产者截图:

    消费者截图:

    kafka集成到spring boot

    #1 pom 文件

    <!--kafka-->

    <dependency>

    <groupId>org.springframework.kafka</groupId>

    <artifactId>spring-kafka</artifactId>

    </dependency>

    <dependency>

    <groupId>com.google.code.gson</groupId>

    <artifactId>gson</artifactId>

    </dependency>

    import org.apache.kafka.clients.consumer.ConsumerRecord;

    import org.slf4j.LoggerFactory;

    import org.springframework.kafka.annotation.KafkaListener;

    import org.springframework.stereotype.Component;

    import java.util.Optional;

    @Component

    public class KafkaReceiver {

    private final org.slf4j.Loggerlog = LoggerFactory.getLogger(getClass());

    @KafkaListener(topics = {"linziheng"})

    public void listen(ConsumerRecord record) {

    Optional kafkaMessage = Optional.ofNullable(record.value());

    if (kafkaMessage.isPresent()) {

    Object message = kafkaMessage.get();

    log.info("----------------- record =" + record);

    log.info("------------------ message =" + message);

    }

    }

    }

    测试接口:

    @Autowired

    private KafkaTemplatekafkaTemplate;

    private Gsongson =new GsonBuilder().create();

    @ApiOperation("kafka测试接口")

    @PostMapping("/testKafka")

    public void getRsJournalByUserId() {

    Message message =new Message();

    message.setId(System.currentTimeMillis());

    message.setMsg(UUID.randomUUID().toString());

    message.setSendTime(new Date());

    kafkaTemplate.send("linziheng",gson.toJson(message));

    }

    Message  类

    @Data

    public class Message {

    private Longid;//id

        private Stringmsg;//消息

        private DatesendTime;//时间戳

    }

    请求接口后,控制台打印信息

    相关文章

      网友评论

          本文标题:Docker 安装kafka 并集成到spring boot

          本文链接:https://www.haomeiwen.com/subject/jhfymhtx.html