1个Topic(主题)只创建1个Partition(分区)
使用idea创建两个module , kafka-producer-order , kafka-consumer-order
pom.xml kafka相关
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<!--<version>2.6.4</version> 去掉这一行 -->
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
消费者(kafka-consumer-order) application.yml
server:
port: 8083
spring:
application:
name: kafka_consumer_order
cloud:
stream:
kafka:
binder:
brokers: localhost:9092 #Kafka的消息中间件服务器
zk-nodes: localhost:2181 #Zookeeper的节点,如果集群,后面加,号分隔
auto-create-topics: true #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
bindings:
stream-demo-order: #这里可以任意写,生产者应与之一致
destination: custom-message-topic-order #这里可以任意写,生产者应与之一致,消息发往的目的地
content-type: application/json #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* @Author ds
* @Date 2022-04-07
*/
public interface StreamClient {
String STREAM_DEMO = "stream-demo-order";
@Input(StreamClient.STREAM_DEMO)
SubscribableChannel streamDataInput();
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
/**
* @Author ds
* @Date 2022-04-07
*/
@Slf4j
@EnableBinding(StreamClient.class)
public class ReceiveData {
@StreamListener(StreamClient.STREAM_DEMO)
public void consume(String message){
log.info("接收消息: {} " , message);
}
}
生产者(kafka-producer-order) application.yml
server:
port: 8182
spring:
application:
name: kafka_producer_order
cloud:
stream:
kafka:
binder:
brokers: localhost:9092 #Kafka的消息中间件服务器
zk-nodes: localhost:2181 #Zookeeper的节点,如果集群,后面加,号分隔
auto-create-topics: true #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
bindings:
stream-demo-order: #这里可以任意写,消费者应与之一致
destination: custom-message-topic-order #这里可以任意写,消费者应与之一致,消息发往的目的地
content-type: application/json #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* @Author ds
* @Date 2022-04-02
*/
public interface StreamClient {
String STREAM_DEMO = "stream-demo-order";
@Output(StreamClient.STREAM_DEMO)
MessageChannel streamDataOut();
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author ds
* @Date 2022-04-02
*/
@RestController
@EnableBinding(StreamClient.class)
public class TestController {
@Autowired
private StreamClient streamClient;
@GetMapping("/produce")
public String produce(){
for(int i = 0; i < 100 ; i++){
streamClient.streamDataOut().send(MessageBuilder.withPayload("消息" + i).build());
}
return "成功";
}
}
网友评论