美文网首页Docker容器Java 杂谈
如何使用Docker内的kafka服务

如何使用Docker内的kafka服务

作者: 金桔文案 | 来源:发表于2019-01-22 16:13 被阅读12次

    基于Docker可以很轻松的搭建一个kafka集群,其他机器上的应用如何使用这个kafka集群服务呢?本次实战就来解决这个问题。

    基本情况

    整个实战环境一共有三台机器,各自的职责如下图所示:


    image.png

    整个环境的部署情况如下图:


    image.png
    版本信息

    1.操作系统:Centos7
    2.docker:17.03.2-ce
    3.docker-compose:1.23.2
    4.kafka:0.11.0.3
    5.zookeeper:3.4.9
    6.JDK:1.8.0_191
    7.spring boot:1.5.9.RELEASE
    8.spring-kafka:1.3.8.RELEASE

    重点介绍

    本次实战有几处重点需要注意:

    1.spring-kafka和kafka的版本匹配问题,请关注官方文档:https://spring.io/projects/spring-kafka
    2.kafka的kafka的advertised.listeners配置,应用通过此配置来连接broker;
    3.应用所在服务器要配置host,才能连接到broker;

    接下来开始实战吧;

    配置host

    为了让生产和消费消息的应用能够连接kafka成功,需要配置应用所在服务器的/etc/hosts文件,增加以下一行内容:

    192.168.1.101 kafka1
    

    192.168.1.101是docker所在机器的IP地址;

    请注意,生产和消费消息的应用所在服务器都要做上述配置;

    可能有的读者在此会有疑问:为什么要配置host呢?我把kafka配置的advertised.listeners配置成kafka的IP地址不就行了么?这样的配置我试过,但是用kafka-console-producer.sh和kafka-console-consumer.sh连接kafka的时候会报错"LEADER_NOT_AVAILABLE"。

    在docker上部署kafka

    1.在docker机器上编写docker-compose.yml文件,内容如下:

    version: '2'
    services:
      zookeeper:
        image: wurstmeister/zookeeper
        ports:
          - "2181:2181"
      kafka1:
        image: wurstmeister/kafka:2.11-0.11.0.3
        ports:
          - "9092:9092"
        environment:
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
          KAFKA_LISTENERS: PLAINTEXT://:9092
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_CREATE_TOPICS: "topic001:2:1"
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
    

    上述配置中有两处需要注意:
    第一,KAFKA_ADVERTISED_LISTENERS的配置,这个参数会写到kafka配置的advertised.listeners这一项中,应用会用来连接broker;
    第二,KAFKA_CREATE_TOPICS的配置,表示容器启动时会创建名为"topic001"的主题,并且partition等于2,副本为1;

    2.在docker-compose.yml所在目录执行命令docker-compose up -d,启动容器;
    3.执行命令docker ps,可见容器情况,kafka的容器名为temp_kafka1_1:

    [root@hedy temp]# docker ps
    CONTAINER ID        IMAGE                              COMMAND                  CREATED             STATUS              PORTS                                                NAMES
    ba5374d6245c        wurstmeister/zookeeper             "/bin/sh -c '/usr/..."   About an hour ago   Up About an hour    22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   temp_zookeeper_1
    2c58f46bb772        wurstmeister/kafka:2.11-0.11.0.3   "start-kafka.sh"         About an hour ago   Up About an hour    0.0.0.0:9092->9092/tcp                               temp_kafka1_1
    

    4.执行以下命令可以查看topic001的基本情况:

    docker exec temp_kafka1_1 \
    kafka-topics.sh \
    --describe \
    --topic topic001 \
    --zookeeper zookeeper:2181
    

    看到的信息如下:

    Topic:topic001  PartitionCount:2    ReplicationFactor:1 Configs:
        Topic: topic001 Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic001 Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001
    

    源码下载

    接下来的实战是编写生产消息和消费消息的两个应用的源码,您可以选择直接从GitHub下载这两个工程的源码,地址和链接信息如下表所示:


    image.png

    这个git项目中有多个文件夹,本章源码在kafka01103consumer和kafka01103producer这两个文件夹下,如下图红框所示:


    image.png

    接下来开始编码:

    开发生产消息的应用

    1. 创建一个maven工程,pom.xml内容如下:
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.9.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.bolingcavalry</groupId>
        <artifactId>kafka01103producer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>kafka01103producer</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>1.3.8.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.28</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>
    

    再次强调spring-kafka版本和kafka版本的匹配很重要;

    1. 配置文件application.properties内容:
    #kafka相关配置
    spring.kafka.bootstrap-servers=kafka1:9092
    #设置一个默认组
    spring.kafka.consumer.group-id=0
    #key-value序列化反序列化
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    #每次批量发送消息的数量
    spring.kafka.producer.batch-size=65536
    spring.kafka.producer.buffer-memory=524288
    

    3.发送消息的业务代码只有一个MessageController类:

    package com.bolingcavalry.kafka01103producer.controller;
    
    import com.alibaba.fastjson.JSONObject;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.web.bind.annotation.*;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.UUID;
    
    /**
     * @Description: 接收web请求,发送消息到kafka
     * @author: willzhao E-mail: zq2599@gmail.com
     * @date: 2019/1/1 11:44
     */
    @RestController
    public class MessageController {
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        @RequestMapping(value = "/send/{name}/{message}", method = RequestMethod.GET)
        public @ResponseBody
        String send(@PathVariable("name") final String name, @PathVariable("message") final String message) {
            SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String timeStr = simpleDateFormat.format(new Date());
    
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("name", name);
            jsonObject.put("message", message);
            jsonObject.put("time", timeStr);
            jsonObject.put("timeLong", System.currentTimeMillis());
            jsonObject.put("bizID", UUID.randomUUID());
    
            String sendMessage = jsonObject.toJSONString();
    
            ListenableFuture future = kafkaTemplate.send("topic001", sendMessage);
            future.addCallback(o -> System.out.println("send message success : " + sendMessage),
                    throwable -> System.out.println("send message fail : " + sendMessage));
    
            return "send message to [" +  name + "] success (" + timeStr + ")";
        }
    }
    

    4.编码完成后,在pom.xml所在目录执行命令mvn clean package -U -DskipTests,即可在target目录下发现文件kafka01103producer-0.0.1-SNAPSHOT.jar,将此文件复制到192.168.1.102机器上;

    5.登录192.168.1.102,在文件kafka01103producer-0.0.1-SNAPSHOT.jar所在目录执行命令java -jar kafka01103producer-0.0.1-SNAPSHOT.jar,即可启动生产消息的应用;

    开发消费消息的应用

    1.创建一个maven工程,pom.xml内容如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.9.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.bolingcavalry</groupId>
        <artifactId>kafka01103consumer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>kafka01103consumer</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>1.3.8.RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>
    

    再次强调spring-kafka版本和kafka版本的匹配很重要;

    1. 配置文件application.properties内容:
    #kafka相关配置
    spring.kafka.bootstrap-servers=192.168.1.101:9092
    #设置一个默认组
    spring.kafka.consumer.group-id=0
    #key-value序列化反序列化
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    #每次批量发送消息的数量
    spring.kafka.producer.batch-size=65536
    spring.kafka.producer.buffer-memory=524288
    

    3.消费消息的业务代码只有一个Consumer类,收到消息后,会将内容内容和消息的详情打印出来:

    @Component
    public class Consumer {
        @KafkaListener(topics = {"topic001"})
        public void listen(ConsumerRecord<?, ?> record) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    
            if (kafkaMessage.isPresent()) {
    
                Object message = kafkaMessage.get();
    
                System.out.println("----------------- record =" + record);
                System.out.println("------------------ message =" + message);
            }
        }
    }
    

    4.编码完成后,在pom.xml所在目录执行命令mvn clean package -U -DskipTests,即可在target目录下发现文件kafka01103consumer-0.0.1-SNAPSHOT.jar,将此文件复制到192.168.1.104机器上;

    5.登录192.168.1.104,在文件kafka01103consumer-0.0.1-SNAPSHOT.jar所在目录执行命令java -jar kafka01103consumer-0.0.1-SNAPSHOT.jar,即可启动消费消息的应用,控制台输出如下:

    2019-01-01 13:41:41.747  INFO 1422 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.11.0.2
    2019-01-01 13:41:41.748  INFO 1422 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 73be1e1168f91ee2
    2019-01-01 13:41:41.787  INFO 1422 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 
    2019-01-01 13:41:41.912  INFO 1422 --- [           main] c.b.k.Kafka01103consumerApplication      : Started Kafka01103consumerApplication in 11.876 seconds (JVM running for 16.06)
    2019-01-01 13:41:42.699  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator kafka1:9092 (id: 2147482646 rack: null) for group 0.
    2019-01-01 13:41:42.721  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group 0
    2019-01-01 13:41:42.723  INFO 1422 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
    2019-01-01 13:41:42.724  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group 0
    2019-01-01 13:41:42.782  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group 0 with generation 5
    2019-01-01 13:41:42.788  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [topic001-1, topic001-0] for group 0
    2019-01-01 13:41:42.805  INFO 1422 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[topic001-1, topic001-0]
    2019-01-01 13:48:00.938  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [topic001-1, topic001-0] for group 0
    2019-01-01 13:48:00.939  INFO 1422 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[topic001-1, topic001-0]
    

    上述内容显示了当前应用消费了两个partition;

    6.再启动一个同样的应用,这样每个应用负责一个parititon的消费,做法是在文件kafka01103consumer-0.0.1-SNAPSHOT.jar所在目录执行命令java -jar kafka01103consumer-0.0.1-SNAPSHOT.jar --server.port=8081,看看控制台的输出:

    2019-01-01 13:47:58.068  INFO 1460 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.11.0.2
    2019-01-01 13:47:58.069  INFO 1460 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 73be1e1168f91ee2
    2019-01-01 13:47:58.103  INFO 1460 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 
    2019-01-01 13:47:58.226  INFO 1460 --- [           main] c.b.k.Kafka01103consumerApplication      : Started Kafka01103consumerApplication in 11.513 seconds (JVM running for 14.442)
    2019-01-01 13:47:59.007  INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator kafka1:9092 (id: 2147482646 rack: null) for group 0.
    2019-01-01 13:47:59.030  INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group 0
    2019-01-01 13:47:59.031  INFO 1460 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
    2019-01-01 13:47:59.032  INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group 0
    2019-01-01 13:48:00.967  INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group 0 with generation 6
    2019-01-01 13:48:00.985  INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [topic001-0] for group 0
    2019-01-01 13:48:01.015  INFO 1460 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[topic001-0]
    

    可见新的进程消费的是0号partition,此时再去看看先启动的进程的控制台,见到了新的日志,显示该进程只消费1号pairtition了:

    2019-01-01 13:48:00.955  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group 0 with generation 6
    2019-01-01 13:48:00.960  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [topic001-1] for group 0
    2019-01-01 13:48:00.967  INFO 1422 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[topic001-1]
    
    验证消息的生产和消费

    1.在浏览器输入以下地址:192.168.1.102:8080/send/Tom/hello
    2.浏览器显示返回的结果是:send message to [Tom] success (2019-01-01 13:58:08),表示操作成功;
    3.去检查两个消费者进程的控制台,发现其中一个成功的消费了消息,如下:

    ----------------- record =ConsumerRecord(topic = topic001, partition = 0, offset = 0, CreateTime = 1546351226016, serialized key size = -1, serialized value size = 133, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"timeLong":1546351225804,"name":"Tom","bizID":"4f1b6cf6-78d4-455d-b530-3956723a074f","time":"2019-01-01 22:00:25","message":"hello"})
    ------------------ message ={"timeLong":1546351225804,"name":"Tom","bizID":"4f1b6cf6-78d4-455d-b530-3956723a074f","time":"2019-01-01 22:00:25","message":"hello"}
    

    至此,外部应用使用基于Docker的kafa服务实战就完成了,如果您也在用Docker部署kafka服务,给外部应用使用,希望本文能给您提供一些参考;

    文章来源:https://blog.csdn.net/boling_cavalry/article/details/85528519
    推荐阅读:https://www.roncoo.com/search/docker

    相关文章

      网友评论

        本文标题:如何使用Docker内的kafka服务

        本文链接:https://www.haomeiwen.com/subject/cqaljqtx.html