之前使用kafka只有2种模式
1.生产者消费者
2.发布订阅
而ribbitmq却有三种模式 fanout,topic,direct
即除了一对一,一对多,之外加了多对多的关系
但是这里还是只展示生产者消费者,发布订阅模式;
目标:相同子系统开多个节点,只会消费一次
不同子系统,每个系统都消费一次
思路:相同的queue bing在topic上只会消费一次
不同的queue bing在topic上,每个queue都会消费,由于之前按使用kafka总想着有没有group来设置不同系统之间的消费,陷入了误区,其实rabbitmq的queue 是group + topic 的集合,在下面的演示中可以发现。
安装
docker-compose.yml
rabbitmq:
image: rabbitmq:management-alpine
container_name: sc-rabbitmq
volumes:
- ./data/rabbitmq:/var/lib/rabbitmq/mnesia
networks:
- sc-net
ports:
- 5672:5672
- 15672:15672
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
networks:
sc-net:
external: false
springboot 2.x 整合
1.引入pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.application配置
spring:
application:
name: spirng-boot-ribbit
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
template:
exchange: SPRING-BOOT-EXCHANGE
3.测试
2个spring boot moudle 1个既可以发消息又可以收消息, 另一个只接受消息,但是同时启动2个节点
3.1生产者
@Component
public class GoodsProducer {
@Autowired
RabbitTemplate rabbitTemplate;
public void send(String type, String id) {
try {
//exchange : 此处 exchange 可随意命名,但是要保证消费者bing一致
//key : goods 可以表示任意业务,type 表示业务行为
//message : id即发送的消息,要求尽可能少,可以序列化后传输
rabbitTemplate.convertAndSend("SPRING-BOOT-EXCHANGE", "goods." + type, id);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.2生产者模块下的消费者
@Component
public class GoodsReceiver {
//queue命名携带模块信息保证不同模块重复消费
//exchange保持一致,key满足包含,相等, #,等规则
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "PRODUCER.GOODS.ADD",durable = "true"),
exchange = @Exchange(value = "SPRING-BOOT-EXCHANGE",type = ExchangeTypes.TOPIC),
key = {"goods.add"}))
public void handleAddGoods(String goodsId){
System.out.println("1:======================"+goodsId+"============================");
}
}
3.3 消费者模块下的消费者
@Component
public class GoodsReceiver {
//同3.2
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "RECEIVER.GOODS.ADD",durable = "true"),
exchange = @Exchange(value = "SPRING-BOOT-EXCHANGE",type = ExchangeTypes.TOPIC),
key = {"goods.add"}))
public void handleAddGoods(String goodsId){
System.out.println("2:======================"+goodsId+"============================");
}
}
3.4
测试用例
public class RabbitMQTest extends TestApplicationTests {
@Autowired
GoodsProducer goodsProducer;
@Test
void send(){
for (int i = 0; i < 10; i++) {
goodsProducer.send("add","1");
}
}
}
3.5结果
通过控制台打印可以发现,生产者模块下 打印0-9
2个消费者模块下轮询打印,各消费5个
4.最后
可以尝试和kafka一样配置:
在queue前默认添加模块名;
exchange 默认是项目名 ;
在消息发送,接受时统一对消息序列化与反序列化;
尽可能忽略消息的异常,统一通过日志中心区修复问题。
网友评论