1. docker-compose
部署rockermq
-
编辑
docker-compose.yml
-
编辑 docker-compose.ymlvim docker-compose.yml
-
docker-compose.yml
内容version: "3.7" services: rocketmq-server: image: foxiswho/rocketmq:server container_name: rocketmq-server ports: - 9876:9876 volumes: - ./rocketmq/server/logs:/opt/logs - ./rocketmq/server/store:/opt/store networks: rocketmq: aliases: - rocketmq-server rocketmq-broker: image: foxiswho/rocketmq:broker container_name: rocketmq-broker ports: - 10909:10909 - 10911:10911 volumes: - ./rocketmq/broker/logs:/opt/logs - ./rocketmq/broker/store:/opt/store - ./rocketmq/broker/conf/broker.conf:/etc/rocketmq/broker.conf environment: NAMESRV_ADDR: "rocketmq-server:9876" JAVA_OPTS: " -Duser.home=/opt" JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m" command: mqbroker -c /etc/rocketmq/broker.conf depends_on: - rocketmq-server networks: rocketmq: aliases: - rocketmq-broker rocketmq-console: image: styletang/rocketmq-console-ng container_name: rocketmq-console ports: - 8080:8080 environment: JAVA_OPTS: "-Drocketmq.namesrv.addr=rocketmq-server:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" depends_on: - rocketmq-server networks: rocketmq: aliases: - rocketmq-console networks: rocketmq: name: rocketmq driver: bridge
-
networds
:网络桥接,使rocketmq
的三个组件处于同一网段 -
docker netword ls
命令可以查看docker-compose
网络
-
-
-
编辑
broker.conf
-
rocketmq-broker
有一个配置文件broker.conf
,需要一些自定义的配置 -
根据
docker-compose.yml
中的volumes
所配置的,需要在./rocketmq/broker/conf/
目录下创建一个名为broker.conf
的文件
创建目录# 在当前目录下递归创建目录 mkdir -p rocketmq/broker/conf/
-
broker.conf
内容# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # 所属集群名字 brokerClusterName=DefaultCluster # broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a # 在 broker-b.properties 使用: broker-b brokerName=broker-a # 0 表示 Master,> 0 表示 Slave brokerId=0 # nameServer地址,分号分割 # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 # 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed # 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不使用docker 内部IP # brokerIP1=47.115.35.187 # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 # 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false autoCreateTopicEnable=true # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true # Broker 对外服务的监听端口 listenPort=10911 # 删除文件时间点,默认凌晨4点 deleteWhen=04 # 文件保留时间,默认48小时 fileReservedTime=120 # commitLog 每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 # destroyMapedFileIntervalForcibly=120000 # redeleteHangedFileInterval=120000 # 检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 # 存储路径 # storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store # commitLog 存储路径 # storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog # 消费队列存储 # storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue # 消息索引存储路径 # storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index # checkpoint 文件存储路径 # storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint # abort 文件存储路径 # abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort # 限制的消息大小 maxMessageSize=65536 # flushCommitLogLeastPages=4 # flushConsumeQueueLeastPages=2 # flushCommitLogThoroughInterval=10000 # flushConsumeQueueThoroughInterval=60000 # Broker 的角色 # - ASYNC_MASTER 异步复制Master # - SYNC_MASTER 同步双写Master # - SLAVE brokerRole=ASYNC_MASTER # 刷盘方式 # - ASYNC_FLUSH 异步刷盘 # - SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH # 发消息线程池数量 # sendMessageThreadPoolNums=128 # 拉消息线程池数量 # pullMessageThreadPoolNums=128
-
注意
如果部署在
服务器
,那么需要将brokerIP1
的值设置为服务器
的宿主机
的ip
地址,否则,信息将不能注册上去,发送消息将会失败
-
-
启动
-
返回
docker-compose.yml
文件所在的目录
返回docker-compose文件所在的目录# 返回上一次所在的目录 cd -
-
运行
docker-compose.yml
容器运行# 运行 docker-compose.yml docker-compose up -d # 查看容器运行情况 docker ps -a
-
-
浏览器查看 服务器ip:端口号
服务器ip:端口号
2. rocketmq-spring-boot-starter
的简单使用
-
依赖
-
依赖
<!-- rocket mq --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency>
-
说明
这是
spring-boot
的依赖
-
-
配置文件
-
rocket-mq-provider
提供者的application.yml
主配置文件server: port: 8081 rocketmq: # RocketMq-Server的服务器地址 name-server: 120.25.207.44:9876 producer: # 配置属于哪个 group group: zero-rocket-mq-test-group # 过期时间 send-message-timeout: 30000
-
rocket-mq-consumer
消费者的application.yml
主配置文件server: port: 8082 rocketmq: name-server: 120.25.207.44:9876 producer: send-message-timeout: 30000
-
-
栗子
-
rocket-mq-provider
的controller
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * 说明: * * @author sheng */ @Slf4j @RestController() @RequestMapping("/test") public class TestController { @Resource RocketMQTemplate rocketMqTemplate; @GetMapping("/test") public String test() { // 发送消息的主题 String topic = "test"; // 发送到 RocketMq 的消息 String message = "Hello rocketMq"; // syncSend()是同步发送消息,直接返回 SendResult 对象 // asyncSend() 异步发送消息 // new SendCallback() SendResult 返回的消息对象 rocketMqTemplate.asyncSend(topic, message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { String msgId = sendResult.getMsgId(); log.info("provider_onSuccess: " + msgId); } @Override public void onException(Throwable throwable) { } }); return "rocket mq"; } }
-
rocket-mq-consumer
的listener
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * 说明:@RocketMQMessageListener(topic = "test", consumerGroup = "zero-rocket-mq-test-group")中 * topic 对应提供者 controller 中 * public void asyncSend(String destination,Object payload,org.apache.rocketmq.client.producer.SendCallback sendCallback) * 的 `destination`, consumerGroup 对应提供者主配置文件配置的 group * * @author sheng */ @Slf4j @Component @RocketMQMessageListener(topic = "test", consumerGroup = "zero-rocket-mq-test-group") public class TestListener implements RocketMQListener<String> { @Override public void onMessage(String s) { log.info(s); } }
-
浏览器
-
执行提供者的
controllercontroller
方法 -
查看
消费者RocketMq-console-ng
的消费者
-
-
3. spring-cloud-starter-stream-rocketmq
的简单使用
-
官方文档
-
依赖
-
依赖
<!-- spring-cloud-rocketmq --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency>
- 使用了
spring-cloud-rocketmq
就不需要rocketmq-spring-boot-starter
了
- 使用了
-
父依赖
<properties> <spring-cloud-alibaba.version>2.2.1.RELEASE</spring-cloud-alibaba.version> </properties> <dependencyManagement> <dependencies> <!-- spring-cloud-alibaba-dependencies --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>${spring-cloud-alibaba.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
-
-
配置文件以及启动类
-
rocket-mq-provider
提供者-
application.yml
主配置文件server: port: 8081 spring: application: name: rocket-mq-provider cloud: stream: rocketmq: binder: # 配置 rocketMq 的 nameserver 服务器地址 name-server: 120.25.207.44:9876 bindings: # 定义名为 output 的 binding,名字随意起 # 值为 Map,{} 为 yml 中 Map 的行内写法 output: {destination: test-topic, content-type: application/json} # 测试自定义 binding myOutput1: {destination: my-topic-1, content-type: application/json} myOutput2: {destination: my-topic-2, content-type: application/json}
-
启动类
import com.sheng.cloud.provider.binding.MySource; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; @EnableBinding({Source.class, MySource.class}) @SpringBootApplication public class RocketMqProviderApplication { public static void main(String[] args) { SpringApplication.run(RocketMqProviderApplication.class, args); } }
- 启动类要用
@EnableBinding
注解标记,值是一个Class<?>[] value() default {};
泛型字节码对象数组 -
Source.class
是org.springframework.cloud.stream.messaging.Source
,自带的binding
,用于发送
消息 -
MySource.class
是自定义的binding
- 启动类要用
-
-
rocket-mq-consumer
消费者-
application.yml
主配置文件server: port: 8082 spring: application: name: rocket-mq-consumer cloud: stream: rocketmq: binder: # 配置 rocketMq 的 nameserver 服务器地址 name-server: 120.25.207.44:9876 bindings: # 定义名为 input 的 binding,名字随意起 # 值为 Map,{} 为 yml 中 Map 的行内写法 # {} 里面的值 destination、content-type要与 提供者的 output 的值对应上 input: {destination: test-topic, content-type: application/json, group: test-group} # 自定义 binding myInput1: {destination: my-topic-1, content-type: application/json, group: my-binding-1} myInput2: {destination: my-topic-2, content-type: application/json, group: my-binding-2}
-
启动类
import com.sheng.cloud.provider.binding.MySink; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Sink; @EnableBinding({Sink.class, MySink.class}) @SpringBootApplication public class RocketMqConsumerApplication { public static void main(String[] args) { SpringApplication.run(RocketMqConsumerApplication.class, args); } }
- 启动类要用
@EnableBinding
注解标记,值是一个Class<?>[] value() default {};
泛型字节码对象数组 -
Sink.class
是org.springframework.cloud.stream.messaging.Source
,自带的binding
,用于监听
信息 -
MySink.class
是自定义的binding
- 启动类要用
-
-
-
栗子
-
自带的
binding
栗子-
rocket-mq-provider
提供者-
本例子在
controller
使用import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * 说明:测试自带 binding * * @author sheng */ @RestController @RequestMapping("/test") public class TestController { /** * Resource(name = "output") 中 name 为 主配置文件 application.yml 中配置的 * spring.cloud.stream.bindings.output */ @Resource(name = "output") private MessageChannel messageChannel; @GetMapping("/test/{phone}") public String test(@PathVariable String phone) { String message = "hello_"; // 发送消息 messageChannel.send(MessageBuilder.withPayload(message + phone).build()); return "发送消息成功_" + phone; } }
-
注意
- 启动类中要用
@EnableBinding
标记,并把Source.class
注册上去,使binding
生效
- 启动类中要用
-
-
rocket-mq-consumer
消费者-
本例子创建
listener
import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * 说明:必须注册到 spring 容器中 * * @author sheng */ @Slf4j @Component public class TestListener { @StreamListener("input") public void input(String message) { log.info("testListener: " + message); } }
-
listener
必须注册
到容器中 - 方法使用
@StreamListener
标记,值是主配置文件application.yml
中配置的binding
。(org.springframework.cloud.stream.annotation.StreamListener;
)
-
-
注意
-
listener
必须注册
到容器中 - 启动类中要用
@EnableBinding
标记,并把Sink.class
注册上去,使binding
生效
-
-
-
结果
-
先访问
提供者 controller提供者
的controller
,使其发送信息 -
topic 主题 message 消息RocketMq-console-ng
查看topic
主题、message
消息 -
控制台
控制台
-
-
-
自定义
binding
栗子-
rocket-mq-provider
提供者-
自定义
bingding
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * 说明:自定义 binding,不需要注册到容器中,在启动类中配合 @EnableBinding 使其生效 * * @author sheng */ public interface MySource { /** * 自定义 binding * @return org.springframework.messaging.MessageChannel 对象 */ @Output("myOutput1") MessageChannel myOutput1(); /** * 自定义 binding * @return org.springframework.messaging.MessageChannel 对象 */ @Output("myOutput2") MessageChannel myOutput2(); }
- 使用
@Output
注解标记方法,值是主配置文件application.yml
中设置的binding
- 方法返回值是
org.springframework.messaging.MessageChannel
,用于发送
消息 - 启动类中要用
@EnableBinding
标记,并把MySource.class
注册上去,使自定义binding
生效
- 使用
-
controller
import com.sheng.cloud.provider.binding.MySource; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * 说明: * * @author sheng */ @RestController @RequestMapping("/binding") public class MyBindingController { @Resource private MySource mySource; @GetMapping("/myOutput1/{phone}") public String myOutput1(@PathVariable String phone) { String message = "自定义 binding myOutput1:"; // 发送消息 mySource.myOutput1().send(MessageBuilder.withPayload(message + phone).build()); return "MyBindingController_myOutput1_" + phone; } @GetMapping("/myOutput2/{phone}") public String myOutput2(@PathVariable String phone) { String message = "自定义 binding myOutput2:"; // 发送消息 mySource.myOutput2().send(MessageBuilder.withPayload(message + phone).build()); return "MyBindingController_myOutput2_" + phone; } }
-
-
rocket-mq-consumer
消费者-
自定义
binding
import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; /** * 说明:不需要注册到容器中,在启动类中 @EnableBinding 使其生效 * * @author sheng */ public interface MySink { /** * 自定义 binding * * @return org.springframework.messaging.SubscribableChannel 对象 */ @Input("myInput1") SubscribableChannel myInput1(); /** * 自定义 binding * * @return org.springframework.messaging.SubscribableChannel 对象 */ @Input("myInput2") SubscribableChannel myInput2(); }
- 使用
@Input
注解标记方法,值是主配置文件application.yml
中设置的binding
- 方法返回值是
org.springframework.messaging.SubscribableChannel;
,用于监听
消息 - 启动类中要用
@EnableBinding
标记,并把自定义的MySink.class
注册上去,使自定义binding
生效
- 使用
-
listener
import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * 说明:必须注册到容器中 * * @author sheng */ @Slf4j @Component public class TestMyBindingListener { @StreamListener("myInput1") public void myInput1(String message) { log.info("TestMyBindingListener_myInput1: " + message); } @StreamListener("myInput2") public void myInput2(String message) { log.info("TestMyBindingListener_myInput2: " + message); } }
-
listener
必须注册
到容器中 - 方法使用
@StreamListener
标记,值是主配置文件application.yml
中配置的binding
。(org.springframework.cloud.stream.annotation.StreamListener;
)
-
-
-
结果
-
先访问
提供者 controller 提供者 controller提供者
的controller
,使其发送
信息 -
topic 主题 message 消息1 message 消息2RocketMq-console-ng
查看topic
主题、message
消息 -
控制台
控制台
-
-
-
网友评论