SpringCloud Stream Kafka
基础知识
消息服务器作用:
一对多通知
并行转串行,削峰,削流
JMS - Java Message Service
Java定义的消息模型或者规范,有一套标准的Java API访问消息服务器,类似于jdbc访问数据库
消息服务器产品:
ActiveMQ/RabbitMQ/RocketMQ/Kafka
ActiveMQ/RabbitMQ/RocketMQ实现了JMS
Kafka没有实现JMS
SpringCloud Stream:SpringCloud的消息服务接口,可以使用它访问:
RabbitMQ/Kafka
Kafka需要Zookeeper作为数据存储。
安装Kafka
官网下载:http://kafka.apache.org/downloads
安装和使用:http://kafka.apache.org/quickstart
课程使用:kafka_2.12-2.2.0.zip
链接:https://pan.baidu.com/s/1Yy0BigcXRhFblVIrdtvRmw
提取码:q4o3
解压
启动Kafka
点击start.bat
会启动2个cmd窗口:
使用,请参考zip包中的windows安装指南.txt
创建主题
启动一个cmd窗口,切换目录到kafka解压目录:
执行命令:
bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic myTopic
执行如下命令查询新建的主题:
bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
发送消息
输入命令:bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic myTopic
输入消息内容:abc
接收消息
另外启动一个新的cmd窗口,切换目录到kafka解压目录:
执行命令:
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic myTopic --from-beginning
启动之后,可以看到上面生产者发送的abc消息。
生产者继续发送abcd消息,消费者可以立即收到。
SpringCloud-Stream消息模型
Source:发射器
Channel:频道
Binder:绑定器
Sink:接收器
消费者分组:
假设下图中服务A的3个实例为一个分组,服务B的2个实例为一个分组,则消息过来之后,只会有一个服务A的实例和服务B的实例处理该消息,其他实例不会收到消息。
使用默认频道进行消息发送和接收
创建项目
父项目springcloud-stream
\springcloud-stream\pom.xml
创建子项目
\springcloud-stream\springcloud-stream-producer
\springcloud-stream\springcloud-stream-consumer
配置:
/springcloud-stream-producer/pom.xml
/springcloud-stream-consumer/pom.xml
发送消息
创建消息发送者类
/springcloud-stream-producer/src/main/java/com/qfedu/demo/springcloud/stream/StreamProducer.java
配置频道及kafka
/springcloud-stream-producer/src/main/resources/application.yml
接收消息
创建消息接收者类
/springcloud-stream-consumer/src/main/java/com/qfedu/demo/springcloud/stream/StreamConsumer.java
配置频道及kafka
测试
参照第2节,启动kafka服务器
启动生产者
启动消费者
访问如下路径,发送消息qfedu2:
消息接收:
自定义频道发送和接收消息
消息生产者
自定义消息输出频道
/springcloud-stream-producer/src/main/java/com/qfedu/demo/springcloud/stream/QfOutputChannel.java
配置spring
映射输出频道到kafka主题
/springcloud-stream-producer/src/main/resources/application.yml
启动类
绑定发射器,并增加发送消息代码
/springcloud-stream-producer/src/main/java/com/qfedu/demo/springcloud/stream/StreamProducer.java
消息消费者
自定义消息输入频道
/springcloud-stream-consumer/src/main/java/com/qfedu/demo/springcloud/stream/QfInputChannel.java
配置spring
/springcloud-stream-consumer/src/main/resources/application.yml
启动类
/springcloud-stream-consumer/src/main/java/com/qfedu/demo/springcloud/stream/StreamConsumer.java
测试
网友评论