基于docker compose部署rocket Mq
image.png
现在是2020年1月19日 我的docker都不是最新版的
一、部署RocketMq
先创建好几个文件夹,用来放日志和配置:
mkdir /usr/local/docker/rocketmq
cd /usr/local/docker/rocketmq
mkdir data
cd data
mkdir logs
mkdir store
mkdir brokerconf
cd brokerconf
vi broker.conf
其中 broker.conf
内容如下:
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
# 修改为你宿主机的 IP
brokerIP1=192.168.30.131
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
maxMessageSize=65536
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
创建docker-compose.yml
cd /usr/local/docker/rocketmq
vi docker-compose.yml
的内容如下:
version: '3.5'
services:
rmqnamesrv:
image: foxiswho/rocketmq:server
container_name: rmqnamesrv
ports:
- 9876:9876
volumes:
- ./data/logs:/opt/logs
- ./data/store:/opt/store
networks:
rmq:
aliases:
- rmqnamesrv
rmqbroker:
image: foxiswho/rocketmq:broker
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
volumes:
- ./data/logs:/opt/logs
- ./data/store:/opt/store
- ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
environment:
NAMESRV_ADDR: "rmqnamesrv:9876"
JAVA_OPTS: " -Duser.home=/opt"
JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
command: mqbroker -c /etc/rocketmq/broker.conf
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqbroker
rmqconsole:
image: styletang/rocketmq-console-ng
container_name: rmqconsole
ports:
- 8080:8080
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqconsole
networks:
rmq:
name: rmq
driver: bridge
PS:
1、rocketmq:server
MQ的服务
2、rocketmq:broker
Mq的中间件
3、rocketmq-console-ng
Mq的可视化控制台
运行:docker-compose up
访问 http://192.168.75.129:8080
然后我们创建一个简单
二、Spring Boot Demo 之生产者
参考资料spring-cloud-alibaba:https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ-en
这里我们简单的创建一个spring boot demo来调用它。
项目源码:https://github.com/xcocean/rocketmq-demo
pom.xml加入,此时需要注意,spring boot使用的版本是2.1.10
(2020年1月20日 最新版),如果使用2.2.x
会报错,他们的特性不一样了。
<!-- RocketMQ 依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>2.1.1.RELEASE</version>
</dependency>
application.yml
spring:
jackson:
time-zone: GMT+8
date-format: yyyy-MM-dd HH:mm:ss
cloud:
# RocketMQ 相关配置
stream:
rocketmq:
binder:
name-server: 192.168.75.129:9876
bindings:
#自定义的名称 # test-group(一级分类)
test-group: {destination: test-group,content-type: application/json}
创建MyProduce
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface MyProduce {
@Output("test-group")
MessageChannel log();
}
在应用启动入口绑定上面的MyProduce.java
import com.lingkang.rocketmqdemo.mq.MyProduce;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
@SpringBootApplication
@EnableBinding({MyProduce.class})
public class RocketmqDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RocketmqDemoApplication.class, args);
}
}
controller中直接装配即可
import com.lingkang.rocketmqdemo.mq.MyProduce;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author linke
* @date 2020-01-19 下午 21:08
* @description
*/
@RestController
public class MainController {
@Autowired
private MyProduce myProduce;
@GetMapping("send")
public Object send(String msg) throws Exception {
myProduce.log().send(MessageBuilder.withPayload(msg).build());
return "";
}
@GetMapping("send1")
public Object send1(String msg) {
//带上标签发送
MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TAGS, "log")
.setHeader(RocketMQHeaders.KEYS, "my-key")
.setHeader("DELAY", "1");
Message message = builder.build();
myProduce.log().send(message);
return message;
}
}
访问http://localhost:8080/send?msg=%E4%BD%A0%E5%A5%BD11111111
和http://localhost:8080/send1?msg=%E4%BD%A0%E5%A5%BD11111111
项目结构如下:
image.png
三、Spring Boot Demo 之消费者
基于上面的项目
改动application.yml 添加消费者绑定
spring:
jackson:
time-zone: GMT+8
date-format: yyyy-MM-dd HH:mm:ss
cloud:
# RocketMQ 相关配置
stream:
rocketmq:
binder:
name-server: 192.168.75.129:9876
bindings:
#自定义的名称 # test-group(一级分类)
test-group: {destination: test-group,content-type: application/json}
# consumer.maxAttempts
# 消息最大可以被尝试消费的次数,包含第一次投递
# 设为 1,表示不重试,注意该值必须大于 0
input-consumer: {destination: test-group, content-type: text/plain,
group: test-group, consumer.maxAttempts: 1}
创建MyConsumer
public interface MyConsumer {
@Input("input-consumer")
MessageChannel log();
}
创建接收监听MyConsumerReceive
@Component
public class MyConsumerReceive {
@StreamListener("input-consumer")
public void receiveConsumer(Object msg) {
System.out.println("消息消费:" + msg);
}
}
结果如下:
消息被消费.png
消息被消费.png
image.png
网友评论