MQ的作用
1)解耦:在项目启动之初是很难预测未来会遇到什么困难的,消息中间件在处理过程中插入了一个隐含的,基于数据的接口层,两边都实现这个接口,这样就允许独立的修改或者扩展两边的处理过程,只要两边遵守相同的接口约束即可。
2)冗余(存储):在某些情况下处理数据的过程中会失败,消息中间件允许把数据持久化知道他们完全被处理
3)扩展性:消息中间件解耦了应用的过程,所以提供消息入队和处理的效率是很容易的,只需要增加处理流程就可以了。
4)削峰:在访问量剧增的情况下,但是应用仍然需要发挥作用,但是这样的突发流量并不常见。而使用消息中间件采用队列的形式可以减少突发访问压力,不会因为突发的超时负荷要求而崩溃
5)可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合性,当一个处理消息的进程挂掉后,加入消息中间件的消息仍然可以在系统恢复后重新处理
6)顺序保证:在大多数场景下,处理数据的顺序也很重要,大部分消息中间件支持一定的顺序性
7)缓冲:消息中间件通过一个缓冲层来帮助任务最高效率的执行
8)异步通信:通过把把消息发送给消息中间件,消息中间件并不立即处。
举个栗子,秒杀业务:
用户端发起下单操作
服务端完成秒杀业务逻辑(库存检查,库存冻结,余额检查,余额冻结,订单生成,余额扣减,库存扣减,生成流水,余额解冻,库存解冻)
用户端下单业务简单,每秒发起了10000个请求,服务端秒杀业务复杂,每秒只能处理2000个请求,很有可能用户端不限速的下单,导致服务端系统被压垮,引发雪崩。
为了避免雪崩,常见的优化方案有两种:
1)业务用户端队列缓冲,限速发送
2)业务服务端队列缓冲,限速执行
服务端的限速执行
rabbitmq提供了一种服务质量保障功能,即在非自动确认消息的前提下,如果一定数目的消息未被确认,不进行消费新的消息。
basicQos
方法在RabbitMQ的Java驱动中对应三个方法:
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
// prefetchSize = 0
void basicQos(int prefetchCount, boolean global) throws IOException;
// prefetchSize = 0 , global = false
void basicQos(int prefetchCount) throws IOException;
prefetchSize:预读取的消息内容大小上限(包含),可以简单理解为消息有效载荷字节数组的最大长度限制,0表示无上限。
prefetchCount:预读取的消息数量上限,0表示无上限。
global:false表示prefetchCount单独应用于信道上的每个新消费者,true表示prefetchCount在同一个信道上的消费者共享。
消费者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置 RabbitMQ 地址
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
// 创建一个新的连接
Connection connection = connectionFactory.newConnection();
// 创建一个新的频道
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "qos.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
// 声明要关注的队列
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//1 限流方式 第一件事就是 autoAck设置为 false
channel.basicQos(0,3,false);
// 不自动回复队列应答 -- RabbitMQ 中的消息确认机制,
channel.basicConsume(queueName,false,new MyConsumer(channel));
}
}
自定义消费者代码
// DefaultConsumer类 实现了 Consumer 接口, 通过传入一个频道, 告诉服务器我们需要哪个频道的消息
// 如果频道中有消息, 就会执行回调函数 handleDelivery
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
//当我们需要确认一条消息已经被消费时,我们调用的 basicAck 方法的第一个参数是 Delivery Tag。
//Delivery Tag 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,
//以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。
//RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
生产者代码
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
// 创建一个新的连接
Connection connection = connectionFactory.newConnection();
// 创建一个新的频道
Channel channel = connection.createChannel();
String exchange = "test_qos_exchange";
String routingKey = "qos.save";
String msg = "Hello RabbitMQ QOS Message";
for(int i =0; i<5; i ++){
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
}
}
}
函数:void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
exchange 做消息转发用
routingKey:路由键,#匹配0个或多个单词,*匹配一个单词,在topic
mandatory:true:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。false:出现上述情形broker会直接将消息扔掉
immediate:true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化 这里指的是消息的持久化,配合channel(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留
mandatory
标志告诉服务器至少将该消息发送到一个队列中,否则将消息返还给生产者;immediate
标志告诉服务器如果该消息关联的queue
上有消费者,则马上将消息投递给它,如果所有queue
都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。
参考文章https://www.jianshu.com/p/adf0b7de6753
网友评论