RabbitMQ作为消息中间件来使用,主要完成程序间的消息传递。那么至少有2个程序来配合,这里假设一个程序是MessageProvider,一个程序是MessageConsumer,程序都使用spring boot 搭建,集成RabbitMQ。
MessageProvider:
@Component
public class MessageProvider implements RabbitTemplate.ConfirmCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
public Queue infoQueue() {
return new Queue("info");
}
@PostConstruct
private void init() {
rabbitTemplate.setConfirmCallback(this);
}
@Async
@Transactional
public void send(Object obj) {
try {
rabbitTemplate.convertAndSend("info",obj, new
CorrelationData(String.valueOf(obj.getId())));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 异步消息确认
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData.getId();
if (ack) {
doSomething(id);
} else {
doSomething(id);
}
}
}
application.yml中rabbitMQ配置:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: 123456
publisher-confirms: true
- 该类需要继承RabbitTemplate.ConfirmCallback接口,并手动实现消息确认功能。
- rabbitTemplate.convertAndSend负责发送消息到指定的queue,如果发送的是对象,则该对象实现序列化接口:Serializable,同时程序发送的对象所在的包必须和接收程序该对象的包一致。
- 消息相关对象correlationData可以通过构造器传入消息的特征,比如id之类的,便于消息确认发送成功的是哪条数据。
- confirm方法中参数ack如果为true,则rabbitMQ成功接收到消息,flase则为未接收到。
- new Queue("info"),默认是生成持久化的queue,即未及时消费的消息会存储到磁盘,RabbitMQ重启之后,数据任然存在,保证不丢失。
接下来看一下消费者
MessageConsumer:
@Component
@RabbitListener(queues = "info")
public class MessageConsumer{
@RabbitHandler
public void messageReceive(Object obj, Channel channel, Message message) {
System.out.println(obj.getId())
try {
//消息消费确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
application.yml:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: 123456
publisher-confirms: true
listener:
simple:
prefetch: 5
# 采用手动应答
acknowledge-mode: manual
# 当前监听容器数
concurrency: 1
# 最大数
max-concurrency: 1
# 是否支持重试
retry:
enabled: true
- @RabbitListener注解如果放在类上,则类里面的添加了@RabbitHandler的方法都会去监听同一个queue。@RabbitListener也可以放在方法上,让该方法去指定监听哪个queue。
- 如果配置文件开启了重试,则消息消费之后如果没有成功返回给RabbitMQ确认消费,则程序会重新尝试获取该条信息,默认尝试次数为3次,如果3次都失败,则消息从unacked状态变为ready状态,会等待发送给其他监听器。
- 配置文件中prefetch的意思是,同一时间进入到待发送队列的消息数量,进入该待发送队列的消息不会再发送给其他监听者。这个待发送队列有别于queue,是为了应付多个监听时,防止一个消息同一时间被多个监听器获取到。
- acknowledge-mode:manual开启手动消息确认机制,只有RabbitMQ在收到确认后才会将消息从queue中删除。
- concurrency设置的是一个监听器开启几个线程去监听消息;max-concurrency设置的是多个监听器最多可以开启的线程数。
- 如果获取的消息是java对象,则该对象必须和发送程序的对象一模一样,实现同样的Serializable接口,该类所在的包路径也必须一致。
- 如果开启多个程序去监听同一个queue,则RabbitMQ会平均分配消息到各个程序。当然通过RabbitMQ的路由功能可以设置多种消息分发机制。
So,介绍就这么多,欢迎交流!
网友评论