美文网首页
二、Spring Cloud Stream整合Kafka

二、Spring Cloud Stream整合Kafka

作者: 一介书生独醉江湖 | 来源:发表于2022-04-05 12:16 被阅读0次
    使用idea创建两个module , kafka-producer , kafka-consumer
    简书链接:https://www.jianshu.com/p/d7771682688b
    
    1)父级 (kafka-test) pom.xml
    
        <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.1.RELEASE</version>
            <!--<relativePath/> &lt;!&ndash; lookup parent from repository &ndash;&gt;-->
        </parent>
        <groupId>com.example.test</groupId>
        <artifactId>kafka-test</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>pom</packaging>
        <name>kafka-test</name>
        <description>Demo project for Spring Boot</description>
        <properties>
            <java.version>1.8</java.version>
            <spring-cloud.version>Hoxton.SR5</spring-cloud.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
    
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <!--<version>2.6.4</version>  去掉这一行 -->
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
        </dependencies>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring-cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
                </plugin>
            </plugins>
        </build>
        <modules>
            <module>kafka-producer</module>
            <module>kafka-consumer</module>
        </modules>
    </project>
    
    
    子级(生产者 kafka-producer)  pom.xml
    
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>com.example.test</groupId>
            <artifactId>kafka-test</artifactId>
            <version>0.0.1-SNAPSHOT</version>
            <!--<relativePath/> &lt;!&ndash; lookup parent from repository &ndash;&gt;-->
        </parent>
        <groupId>com.example.test</groupId>
        <artifactId>kafka-producer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>kafka-producer</name>
        <description>Demo project for Spring Boot</description>
        <properties>
            <java.version>1.8</java.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    子级(消费者 kafka-consumer)  pom.xml
    
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>com.example.test</groupId>
            <artifactId>kafka-test</artifactId>
            <version>0.0.1-SNAPSHOT</version>
            <!--<relativePath/> &lt;!&ndash; lookup parent from repository &ndash;&gt;-->
        </parent>
        <groupId>com.example.test</groupId>
        <artifactId>kafka-consumer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>kafka-consumer</name>
        <description>Demo project for Spring Boot</description>
        <properties>
            <java.version>1.8</java.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    
    子级(生产者 kafka-producer)  application.yml
    
    server:
      port: 8181
    
    spring:
      application:
        name: kafka_producer
      cloud:
        stream:
          kafka:
            binder:
              brokers: localhost:9092   #Kafka的消息中间件服务器
              zk-nodes: localhost:2181  #Zookeeper的节点,如果集群,后面加,号分隔
              auto-create-topics: true  #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
          bindings:
            stream-demo:                          #这里可以任意写,消费者应与之一致
              destination: custom-message-topic   #这里可以任意写,消费者应与之一致,消息发往的目的地
              content-type: application/json      #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
    
    
    子级(消费者 kafka-consumer)  application.yml
    
    server:
      port: 8081
    
    spring:
      application:
        name: kafka_consumer
      cloud:
        stream:
          kafka:
            binder:
              brokers: localhost:9092   #Kafka的消息中间件服务器
              zk-nodes: localhost:2181  #Zookeeper的节点,如果集群,后面加,号分隔
              auto-create-topics: true  #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
          bindings:
            stream-demo:                          #这里可以任意写,生产者应与之一致
              destination: custom-message-topic   #这里可以任意写,生产者应与之一致,消息发往的目的地
              content-type: application/json      #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
    
    
    子级(生产者 kafka-producer) 
    
    package com.example.test.kafkaproducer.test;
    
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    
    /**
     * @Author ds
     * @Date 2022-04-02
     */
    public interface StreamClient {
    
        String STREAM_DEMO = "stream-demo";
    
        @Output(StreamClient.STREAM_DEMO)
        MessageChannel streamDataOut();
    
    
    }
    
    
    package com.example.test.kafkaproducer.test;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @Author ds
     * @Date 2022-04-02
     */
    @RestController
    @EnableBinding(StreamClient.class)
    public class TestController {
    
        @Autowired
        private StreamClient streamClient;
    
        @GetMapping("/produce")
        public String produce(){
            for(int i = 0; i < 100 ; i++){
                streamClient.streamDataOut().send(MessageBuilder.withPayload("消息" + i).build());
            }
            return "成功";
        }
    }
    
    
    子级(消费者 kafka-consumer) 
    
    package com.example.test.kafkaconsumer.test;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.messaging.SubscribableChannel;
    
    /**
     * @Author ds
     * @Date 2022-04-02
     */
    public interface StreamClient {
    
        String STREAM_DEMO = "stream-demo";
    
        @Input(StreamClient.STREAM_DEMO)
        SubscribableChannel streamDataInput();
    }
    
    package com.example.test.kafkaconsumer.test;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    
    /**
     * @Author ds
     * @Date 2022-04-02
     */
    @Slf4j
    @EnableBinding(StreamClient.class)
    public class ReceiveData {
    
        @StreamListener(StreamClient.STREAM_DEMO)
        public void consume(String message){
            log.info("接收消息: {} " , message);
        }
    }
    
    调用接口http://localhost:8181/produce
    
    image.png image.png
    配置过程中遇到的问题以及处理记录 : 
    
    遇到问题1: 使用idea自带maven工具clean出现以下错误
    
    [ERROR] [ERROR] Some problems were encountered while processing the POMs:
    [FATAL] Non-resolvable parent POM for com.example.test:kafka-producer:0.0.1-SNAPSHOT: Could not find artifact com.example.test:kafka-test:pom:0.0.1-SNAPSHOT and 'parent.relativePath' points at no local POM @ line 5, column 10
    [FATAL] Non-resolvable parent POM for com.example.test:kafka-consumer:0.0.1-SNAPSHOT: Could not find artifact com.example.test:kafka-test:pom:0.0.1-SNAPSHOT and 'parent.relativePath' points at no local POM @ line 5, column 13
     @ 
    [ERROR] The build could not read 2 projects -> [Help 1]
    [ERROR]   
    [ERROR]   The project com.example.test:kafka-producer:0.0.1-SNAPSHOT (/Users/ds/Documents/gate/code/kafka-test/kafka-producer/pom.xml) has 1 error
    [ERROR]     Non-resolvable parent POM for com.example.test:kafka-producer:0.0.1-SNAPSHOT: Could not find artifact com.example.test:kafka-test:pom:0.0.1-SNAPSHOT and 'parent.relativePath' points at no local POM @ line 5, column 10 -> [Help 2]
    [ERROR]   
    [ERROR]   The project com.example.test:kafka-consumer:0.0.1-SNAPSHOT (/Users/ds/Documents/gate/code/kafka-test/kafka-consumer/pom.xml) has 1 error
    [ERROR]     Non-resolvable parent POM for com.example.test:kafka-consumer:0.0.1-SNAPSHOT: Could not find artifact com.example.test:kafka-test:pom:0.0.1-SNAPSHOT and 'parent.relativePath' points at no local POM @ line 5, column 13 -> [Help 2]
    [ERROR] 
    [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
    [ERROR] Re-run Maven using the -X switch to enable full debug logging.
    [ERROR] 
    [ERROR] For more information about the errors and possible solutions, please read the following articles:
    [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/ProjectBuildingException
    [ERROR] [Help 2] http://cwiki.apache.org/confluence/display/MAVEN/UnresolvableModelException
    
    解决过程1:
    在terminal中使用mvn clean install 命令重新清理编译打包,会输出具体的错误信息
    
    [ERROR] COMPILATION ERROR : 
    [INFO] -------------------------------------------------------------
    [ERROR] /Users/ds/Documents/gate/code/kafka-test/kafka-producer/src/main/java/com/example/test/kafkaproducer/test/StreamClient.java:[3,51] 程序包org.springframework.cloud.stream.annotation不存在
    [ERROR] /Users/ds/Documents/gate/code/kafka-test/kafka-producer/src/main/java/com/example/test/kafkaproducer/test/StreamClient.java:[4,37] 程序包org.springframework.messaging不存在
    [ERROR] /Users/ds/Documents/gate/code/kafka-test/kafka-producer/src/main/java/com/example/test/kafkaproducer/test/StreamClient.java:[15,5] 找不到符号
      符号:   类 MessageChannel
      位置: 接口 com.example.test.kafkaproducer.test.StreamClient
    [ERROR] /Users/ds/Documents/gate/code/kafka-test/kafka-producer/src/main/java/com/example/test/kafkaproducer/test/TestController.java:[3,45] 程序包org.springframework.messaging.support不存在
    [ERROR] /Users/ds/Documents/gate/code/kafka-test/kafka-producer/src/main/java/com/example/test/kafkaproducer/test/StreamClient.java:[14,6] 找不到符号
      符号:   类 Output
      位置: 接口 com.example.test.kafkaproducer.test.StreamClient
    [ERROR] /Users/ds/Documents/gate/code/kafka-test/kafka-producer/src/main/java/com/example/test/kafkaproducer/test/TestController.java:[22,47] 找不到符号
    ......省略部分异常(太长)
    
    解决过程2:
    把pom.xml中的 <artifactId>spring-cloud-dependencies</artifactId>重新编写一下,点击Import Changes(自动引入的不用)
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring-cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
    重新mvn clean install
    
    ......省略部分异常(太长)
    Caused by: java.lang.NoClassDefFoundError: Lorg/springframework/kafka/listener/CommonErrorHandler;
            at java.lang.Class.getDeclaredFields0(Native Method) ~[na:1.8.0_201]
            at java.lang.Class.privateGetDeclaredFields(Class.java:2583) ~[na:1.8.0_201]
            at java.lang.Class.getDeclaredFields(Class.java:1916) ~[na:1.8.0_201]
            at org.springframework.util.ReflectionUtils.getDeclaredFields(ReflectionUtils.java:738) ~[spring-core-5.3.18.jar:5.3.18]
            ... 82 common frames omitted
    Caused by: java.lang.ClassNotFoundException: org.springframework.kafka.listener.CommonErrorHandler
            at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_201]
            at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_201]
            at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) ~[na:1.8.0_201]
            at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_201]
            ... 86 common frames omitted
    
    
    解决过程3:
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <!--<version>2.6.4</version>  去掉这一行 -->
            </dependency>
    
      因为spring boot 自己管理版本,用了spring boot,又自己指定了版本所以导致jar包冲突
      把pom中的version移除,使用spring boot管理的版本
    
    重新mvn clean install
    
    ......省略部分异常(太长)
    Description:
    
    A component required a bean of type 'com.example.test.kafkaproducer.test.StreamClient' that could not be found.
    Action:
    Consider defining a bean of type 'com.example.test.kafkaproducer.test.StreamClient' in your configuration.
    ......省略部分异常(太长)
    java.lang.IllegalStateException: Failed to load ApplicationContext
    Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'testController': Injection of resource dependencies failed; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.example.test.kafkaproducer.test.StreamClient' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {@javax.annotation.Resource(shareable=true, lookup=, name=, description=, authenticationType=CONTAINER, type=class java.lang.Object, mappedName=)}
    Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.example.test.kafkaproducer.test.StreamClient' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {@javax.annotation.Resource(shareable=true, lookup=, name=, description=, authenticationType=CONTAINER, type=class java.lang.Object, mappedName=)}
    
    解决过程4:
    在TestController中加入注解
    @EnableBinding(StreamClient.class)
    
    重新mvn clean install 成功
    
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 9.349 s
    
    参考:
    https://start.spring.io/
    https://blog.csdn.net/Liyq_19/article/details/123740533
    https://www.cnblogs.com/owenma/p/15463237.html
    https://blog.csdn.net/qq_40708522/article/details/123842483
    https://blog.csdn.net/cckevincyh/article/details/107645351?utm_medium=distribute.pc_aggpage_search_result.none-task-blog-2~aggregatepage~first_rank_ecpm_v1~rank_v31_ecpm-1-107645351.pc_agg_new_rank&utm_term=stream%E6%B3%A8%E5%85%A5%E4%B8%8D%E4%BA%86MessageChannel%E7%9A%84bean&spm=1000.2123.3001.4430
    
    

    相关文章

      网友评论

          本文标题:二、Spring Cloud Stream整合Kafka

          本文链接:https://www.haomeiwen.com/subject/fueajrtx.html