简介:
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ.开源产品 activeMQ,RabbitMQ 阿里RocketMQ(摘自百度)
使用场景:
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
举例:
以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。
RabbitMQ 特点:
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
-
1.可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 -
2.灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。 -
3.消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。 -
4.高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。 -
5.多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。 -
6.多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。 -
7.管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。 -
8.跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。 -
9.插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
词语解释:
image.pngBrocker:消息队列服务器实体。
Exchange:消息交换机,指定消息按什么规则,路由到哪个队列。
Queue:消息队列,每个消息都会被投入到一个或者多个队列里。
Binding:绑定,它的作用是把exchange和queue按照路由规则binding起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
Vhost:虚拟主机,一个broker里可以开设多个vhost,用作不用用户的权限分离。RabbitMQ 默认的 vhost 是 /
Producer:消息生产者,就是投递消息的程序。
Consumer:消息消费者,就是接受消息的程序。
Channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
RabbitMQ使用场景:
学习RabbitMQ的使用场景,来自官方教程:https://www.rabbitmq.com/getstarted.html
添加pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 场景1:单发送单接收
使用场景:简单的发送与接收,没有特别的处理。
生产者:
public class Producer {
private final static String QUEUE_NAME = "test";
public static void main(String[] argv) throws Exception {
/* 创建连接工厂 */
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
/* 创建连接 */
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)
// 第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 需发送的信息
String message = "Hello World!";
//第一个参数为交换机名称、
// 第二个参数为队列映射的路由key、
// 第三个参数为消息的其他属性、
// 第四个参数为发送信息的主体
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消费端 :
public class Consumer {
private final static String QUEUE_NAME = "test";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
/* 创建连接 */
Connection connection = factory.newConnection();
/* 创建信道 */
Channel channel = connection.createChannel();
// 声明一个队列:名称、持久性的(重启仍存在此队列)、非私有的、非自动删除的
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//DefaultConsumer类实现了Consumer接口,通过传入一个频道,
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Customer Received '" + message + "'");
}
};
// 将消费者绑定到队列,并设置自动确认消息(即无需显示确认,如何设置请慎重考虑)
channel.basicConsume(QUEUE_NAME, true, consumer);
}
结果:
生产 者 消费者
- 场景2:单发送多接收
使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。
生产端:
public class Producer {
private static final String TASK_QUEUE_NAME = "work_quene";
public static void main(String[] argv) throws Exception {
/* 创建连接工厂 */
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
/* 创建连接 */
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个队列:名称、持久性的(重启仍存在此队列)、非私有的、非自动删除的
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
//分发信息
for (int i = 0; i < 10; i++) {
String message = "Hello RabbitMQ" + i;
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("NewTask send '" + message + "'");
}
channel.close();
connection.close();
}
}
消费者端:
package com.rabbitmq.mq.workqueues;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
private static final String TASK_QUEUE_NAME = "work_quene";
public static void main(String[] argv) throws Exception {
/* 创建连接工厂 */
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
/* 创建连接 */
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个队列:名称、持久性的(重启仍存在此队列)、非私有的、非自动删除的
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 保证在接收端一个消息没有处理完时不会接收另一个消息,即接收端发送了ack后才会接收下一个消息。在这种情况下发送端会尝试把消息发送给下一个not busy的接收端
channel.basicQos(1);
final DefaultConsumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Worker1 Received '" + message + "'");
try {
doWork(message);
}catch (Exception e){
channel.abort();
}finally {
System.out.println("Worker1 Done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//如果为true的话,每次生产者只要发送信息就会从内存中删除,那么如果消费者程序异常退出
boolean autoAck=false;
//消息消费完成确认
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
}
private static void doWork(String task) {
try {
Thread.sleep(1000); // 暂停1秒钟
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
ps:消费者1和消费者2代码相同。
结果:
生产者 消费者1 消费者2- 场景3:Publish/Subscribe
使用场景:发布、订阅模式,发送端发送广播消息,多个接收端接收。
发布/订阅生产者:
package com.rabbitmq.mq.subscribe;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
/* 创建连接 */
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//发送消息到一个名为“logs”的exchange上,使用“fanout”方式发送,即广播消息 所有的消费者 得到同样的队列信息
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//分发信息
for (int i=0;i<5;i++){
String message="Hello World"+i;
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println("EmitLog Sent '" + message + "'");
}
channel.close();
connection.close();
}
}
消费者:
public class Consumer {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//产生一个随机的队列名称
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");//对队列进行绑定
System.out.println("ReceiveLogs1 Waiting for messages");
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("ReceiveLogs1 Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);//队列会自动删除
}
}
结果:
生产者 订阅者
- 场景4:Routing (按路线发送接收)
使用场景:发送端按routing key发送消息,不同的接收端按不同的routing key接收消息。
路由生产者:
public class RoutingProducer {
private static final String EXCHANGE_NAME = "direct_logs";
// 路由关键字
private static final String[] routingKeys = new String[]{"info", "warning", "error"};
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct");//注意是direct
//发送信息
for (String routingKey : routingKeys) {
String message = "RoutingSendDirect Send the message level:" + routingKey;
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println("RoutingSendDirect Send" + routingKey + "':'" + message);
}
channel.close();
connection.close();
}
}
消费者1:
/**
* 消费者1
*/
public class RoutingWorker1 {
// 交换器名称
private static final String EXCHANGE_NAME = "direct_logs";
// 路由关键字
private static final String[] routingKeys = new String[]{"info" ,"warning"};
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//获取匿名队列名称
String queueName=channel.queueDeclare().getQueue();
//根据路由关键字进行绑定
for (String routingKey:routingKeys){
channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+"," +
" queue:"+queueName+", BindRoutingKey:" + routingKey);
}
System.out.println("ReceiveLogsDirect1 Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("ReceiveLogsDirect1 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
消费者2:
/**
* 消费者2
*/
public class RoutingWorker2 {
// 交换器名称
private static final String EXCHANGE_NAME = "direct_logs";
// 路由关键字
private static final String[] routingKeys = new String[]{"error"};
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//获取匿名队列名称
String queueName=channel.queueDeclare().getQueue();
//根据路由关键字进行绑定
for (String routingKey:routingKeys){
channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+"," +
" queue:"+queueName+", BindRoutingKey:" + routingKey);
}
System.out.println("ReceiveLogsDirect1 Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("ReceiveLogsDirect1 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
结果:
生产者 消费者1 消费者2
- 场景5:Topics (按topic发送接收)
使用场景:发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。(这种应该属于模糊匹配)
topic生产者:
public class TopicProducer {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
//声明一个匹配模式的交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//待发送的消息
String[] routingKeys=new String[]{
"quick.orange.rabbit",
"lazy.orange.elephant",
"quick.orange.fox",
"lazy.brown.fox",
"quick.brown.fox",
"quick.orange.male.rabbit",
"lazy.orange.male.rabbit"
};
//发送消息
for(String severity :routingKeys){
String message = "From "+severity+" routingKey' s message!";
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println("TopicSend Sent '" + severity + "':'" + message + "'");
}
channel.close();
connection.close();
}
}
消费者1:
public class TopicConsumer1 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明一个匹配模式的交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
//路由关键字
String[] routingKeys = new String[]{"*.orange.*"};
//绑定路由
for (String routingKey : routingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
System.out.println("ReceiveLogsTopic1 exchange:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + routingKey);
}
System.out.println("ReceiveLogsTopic1 Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("ReceiveLogsTopic1 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
消费者2:
public class TopicConsumer2 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明一个匹配模式的交换器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
// 路由关键字
String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"};
// 绑定路由关键字
for (String bindingKey : routingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey);
}
System.out.println("ReceiveLogsTopic2 Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
String message = new String(body, "UTF-8");
System.out.println("ReceiveLogsTopic2 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
结果:
生产者 消费者1 消费者2
网友评论