为啥要用MQ
1. 消费方不需要实时等待依赖上一个任务的执行结果,只要生产者随时发送消息,消费者随时可接受消息调用
2.生产者不需要关心消费者的执行结果,直接调用的话会影响生产者的执行结果
RabbitMQ基本概念

- exchange(交换器)发送到不同的Queue
- Queue用来存储生产者发送的消息,也让消费者消费的
- Routing Key(路由Key),生产者发送消息时附带Routing Key选择对应的Queue发送消息
- Binding(绑定key),Queue和分发器之间的桥梁、Routing Key选择合适的桥梁到达对应的Queue。
- prefetch count(预取计数):用于指定消费者从Queue中每一次预取得消息、执行完城后才能接着去取。
-
Exchange Type-fanout(广播式),无论Routing Key是什么,所有Queue都会收到消息。
EXCHANGE广播试.jpg
-
2、 Exchange Type-direct(直接式):Binding Key和Routing Key相同才能收到消息。如下图
EXCHANGE直接式.jpg
-
3、 Exchange Type-topic(主题式):模糊匹配,Binding Key和Routing Key都是被点"."分个开的多个"单词",如"story.ses.nse",使用通配符*(代表单个"单词"),#(代表多个"单词")进行匹配。如下图
EXCHANGE主题式.jpg
4、Exchange Type-topic(主题式)是项目中经常使用的MQ方式,故特此举例:
routingkey由一些单词,中间由.分割,单词可以任意给,但总长度不能超过255个字符。对应的bindingkey也对应是这种形式,绑定键有两种特殊的字符,和#

对于上图的例子,我们将会发送描述动物的消息。这些消息将会以由三个单词组成的路由键发送。路由键中的第一个单词描述了速度,第二个描述了颜色,第三个描述了物种:<speed>.<colour>.<species>。
我们创建了三个绑定,Q1的绑定键为.orange.,Q2的绑定键有两个,分别是..rabbit和lazy.#。
上述绑定关系可以描述为:
①Q1关注所有颜色为orange的动物。
②Q2关注所有的rabbit,以及所有的lazy的动物。
- 如果一个消息的路由键是quick.orange.rabbit,那么Q1和Q2都可以接收到,
- 路由键是lazy.orange.elephant的消息同样如此。
- 路由键是quick.orange.fox的消息只会到达Q1,路由键是lazy.brown.fox的消息只会到达Q2。注意,路由键为lazy.pink.rabbit的消息只会到达Q2一次,尽管它匹配了两个绑定键。
- 路由键为quick.brown.fox的消息因为不和任意的绑定键匹配,所以将会被丢弃。
- 路由键为orange或者quick.orange.male.rabbit,那这个消息不与任何绑定键匹配,则会丢弃。
生产者code举例(官网):
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = getRouting(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
//..
}
消费者code举例:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
- 如果bingdkey输入#,启动第一个消费者,将会接受所有的消息;
- bindingkey输入kern.*,启动第二个消费者,将会受到kern前缀的所有消息;
- bindingkey输入*.critical,启动第三个消费者,将会收到critical后结尾的所有消息
- bindingkey输入"kern.", ".critical",启动第四个消费者,将会受到第二个消费者和第三个消费者的所有消息;
网友评论