使用idea创建两个module , kafka-producer , kafka-consumer
简书链接:https://www.jianshu.com/p/d7771682688b
1)父级 (kafka-test) pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.1.RELEASE</version>
<!--<relativePath/> <!– lookup parent from repository –>-->
</parent>
<groupId>com.example.test</groupId>
<artifactId>kafka-test</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>kafka-test</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR5</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<!--<version>2.6.4</version> 去掉这一行 -->
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
<modules>
<module>kafka-producer</module>
<module>kafka-consumer</module>
</modules>
</project>
子级(生产者 kafka-producer) pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.example.test</groupId>
<artifactId>kafka-test</artifactId>
<version>0.0.1-SNAPSHOT</version>
<!--<relativePath/> <!– lookup parent from repository –>-->
</parent>
<groupId>com.example.test</groupId>
<artifactId>kafka-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-producer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
子级(消费者 kafka-consumer) pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.example.test</groupId>
<artifactId>kafka-test</artifactId>
<version>0.0.1-SNAPSHOT</version>
<!--<relativePath/> <!– lookup parent from repository –>-->
</parent>
<groupId>com.example.test</groupId>
<artifactId>kafka-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-consumer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
子级(生产者 kafka-producer) application.yml
server:
port: 8181
spring:
application:
name: kafka_producer
cloud:
stream:
kafka:
binder:
brokers: localhost:9092 #Kafka的消息中间件服务器
zk-nodes: localhost:2181 #Zookeeper的节点,如果集群,后面加,号分隔
auto-create-topics: true #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
bindings:
stream-demo: #这里可以任意写,消费者应与之一致
destination: custom-message-topic #这里可以任意写,消费者应与之一致,消息发往的目的地
content-type: application/json #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
子级(消费者 kafka-consumer) application.yml
server:
port: 8081
spring:
application:
name: kafka_consumer
cloud:
stream:
kafka:
binder:
brokers: localhost:9092 #Kafka的消息中间件服务器
zk-nodes: localhost:2181 #Zookeeper的节点,如果集群,后面加,号分隔
auto-create-topics: true #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
bindings:
stream-demo: #这里可以任意写,生产者应与之一致
destination: custom-message-topic #这里可以任意写,生产者应与之一致,消息发往的目的地
content-type: application/json #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
子级(生产者 kafka-producer)
package com.example.test.kafkaproducer.test;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* @Author ds
* @Date 2022-04-02
*/
public interface StreamClient {
String STREAM_DEMO = "stream-demo";
@Output(StreamClient.STREAM_DEMO)
MessageChannel streamDataOut();
}
package com.example.test.kafkaproducer.test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author ds
* @Date 2022-04-02
*/
@RestController
@EnableBinding(StreamClient.class)
public class TestController {
@Autowired
private StreamClient streamClient;
@GetMapping("/produce")
public String produce(){
for(int i = 0; i < 100 ; i++){
streamClient.streamDataOut().send(MessageBuilder.withPayload("消息" + i).build());
}
return "成功";
}
}
子级(消费者 kafka-consumer)
package com.example.test.kafkaconsumer.test;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* @Author ds
* @Date 2022-04-02
*/
public interface StreamClient {
String STREAM_DEMO = "stream-demo";
@Input(StreamClient.STREAM_DEMO)
SubscribableChannel streamDataInput();
}
package com.example.test.kafkaconsumer.test;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
/**
* @Author ds
* @Date 2022-04-02
*/
@Slf4j
@EnableBinding(StreamClient.class)
public class ReceiveData {
@StreamListener(StreamClient.STREAM_DEMO)
public void consume(String message){
log.info("接收消息: {} " , message);
}
}
调用接口http://localhost:8181/produce
image.png
image.png
配置过程中遇到的问题以及处理记录 :
遇到问题1: 使用idea自带maven工具clean出现以下错误
[ERROR] [ERROR] Some problems were encountered while processing the POMs:
[FATAL] Non-resolvable parent POM for com.example.test:kafka-producer:0.0.1-SNAPSHOT: Could not find artifact com.example.test:kafka-test:pom:0.0.1-SNAPSHOT and 'parent.relativePath' points at no local POM @ line 5, column 10
[FATAL] Non-resolvable parent POM for com.example.test:kafka-consumer:0.0.1-SNAPSHOT: Could not find artifact com.example.test:kafka-test:pom:0.0.1-SNAPSHOT and 'parent.relativePath' points at no local POM @ line 5, column 13
@
[ERROR] The build could not read 2 projects -> [Help 1]
[ERROR]
[ERROR] The project com.example.test:kafka-producer:0.0.1-SNAPSHOT (/Users/ds/Documents/gate/code/kafka-test/kafka-producer/pom.xml) has 1 error
[ERROR] Non-resolvable parent POM for com.example.test:kafka-producer:0.0.1-SNAPSHOT: Could not find artifact com.example.test:kafka-test:pom:0.0.1-SNAPSHOT and 'parent.relativePath' points at no local POM @ line 5, column 10 -> [Help 2]
[ERROR]
[ERROR] The project com.example.test:kafka-consumer:0.0.1-SNAPSHOT (/Users/ds/Documents/gate/code/kafka-test/kafka-consumer/pom.xml) has 1 error
[ERROR] Non-resolvable parent POM for com.example.test:kafka-consumer:0.0.1-SNAPSHOT: Could not find artifact com.example.test:kafka-test:pom:0.0.1-SNAPSHOT and 'parent.relativePath' points at no local POM @ line 5, column 13 -> [Help 2]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/ProjectBuildingException
[ERROR] [Help 2] http://cwiki.apache.org/confluence/display/MAVEN/UnresolvableModelException
解决过程1:
在terminal中使用mvn clean install 命令重新清理编译打包,会输出具体的错误信息
[ERROR] COMPILATION ERROR :
[INFO] -------------------------------------------------------------
[ERROR] /Users/ds/Documents/gate/code/kafka-test/kafka-producer/src/main/java/com/example/test/kafkaproducer/test/StreamClient.java:[3,51] 程序包org.springframework.cloud.stream.annotation不存在
[ERROR] /Users/ds/Documents/gate/code/kafka-test/kafka-producer/src/main/java/com/example/test/kafkaproducer/test/StreamClient.java:[4,37] 程序包org.springframework.messaging不存在
[ERROR] /Users/ds/Documents/gate/code/kafka-test/kafka-producer/src/main/java/com/example/test/kafkaproducer/test/StreamClient.java:[15,5] 找不到符号
符号: 类 MessageChannel
位置: 接口 com.example.test.kafkaproducer.test.StreamClient
[ERROR] /Users/ds/Documents/gate/code/kafka-test/kafka-producer/src/main/java/com/example/test/kafkaproducer/test/TestController.java:[3,45] 程序包org.springframework.messaging.support不存在
[ERROR] /Users/ds/Documents/gate/code/kafka-test/kafka-producer/src/main/java/com/example/test/kafkaproducer/test/StreamClient.java:[14,6] 找不到符号
符号: 类 Output
位置: 接口 com.example.test.kafkaproducer.test.StreamClient
[ERROR] /Users/ds/Documents/gate/code/kafka-test/kafka-producer/src/main/java/com/example/test/kafkaproducer/test/TestController.java:[22,47] 找不到符号
......省略部分异常(太长)
解决过程2:
把pom.xml中的 <artifactId>spring-cloud-dependencies</artifactId>重新编写一下,点击Import Changes(自动引入的不用)
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
重新mvn clean install
......省略部分异常(太长)
Caused by: java.lang.NoClassDefFoundError: Lorg/springframework/kafka/listener/CommonErrorHandler;
at java.lang.Class.getDeclaredFields0(Native Method) ~[na:1.8.0_201]
at java.lang.Class.privateGetDeclaredFields(Class.java:2583) ~[na:1.8.0_201]
at java.lang.Class.getDeclaredFields(Class.java:1916) ~[na:1.8.0_201]
at org.springframework.util.ReflectionUtils.getDeclaredFields(ReflectionUtils.java:738) ~[spring-core-5.3.18.jar:5.3.18]
... 82 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.springframework.kafka.listener.CommonErrorHandler
at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_201]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_201]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) ~[na:1.8.0_201]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_201]
... 86 common frames omitted
解决过程3:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<!--<version>2.6.4</version> 去掉这一行 -->
</dependency>
因为spring boot 自己管理版本,用了spring boot,又自己指定了版本所以导致jar包冲突
把pom中的version移除,使用spring boot管理的版本
重新mvn clean install
......省略部分异常(太长)
Description:
A component required a bean of type 'com.example.test.kafkaproducer.test.StreamClient' that could not be found.
Action:
Consider defining a bean of type 'com.example.test.kafkaproducer.test.StreamClient' in your configuration.
......省略部分异常(太长)
java.lang.IllegalStateException: Failed to load ApplicationContext
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'testController': Injection of resource dependencies failed; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.example.test.kafkaproducer.test.StreamClient' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {@javax.annotation.Resource(shareable=true, lookup=, name=, description=, authenticationType=CONTAINER, type=class java.lang.Object, mappedName=)}
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.example.test.kafkaproducer.test.StreamClient' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {@javax.annotation.Resource(shareable=true, lookup=, name=, description=, authenticationType=CONTAINER, type=class java.lang.Object, mappedName=)}
解决过程4:
在TestController中加入注解
@EnableBinding(StreamClient.class)
重新mvn clean install 成功
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.349 s
参考:
https://start.spring.io/
https://blog.csdn.net/Liyq_19/article/details/123740533
https://www.cnblogs.com/owenma/p/15463237.html
https://blog.csdn.net/qq_40708522/article/details/123842483
https://blog.csdn.net/cckevincyh/article/details/107645351?utm_medium=distribute.pc_aggpage_search_result.none-task-blog-2~aggregatepage~first_rank_ecpm_v1~rank_v31_ecpm-1-107645351.pc_agg_new_rank&utm_term=stream%E6%B3%A8%E5%85%A5%E4%B8%8D%E4%BA%86MessageChannel%E7%9A%84bean&spm=1000.2123.3001.4430
网友评论