概念
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang框架上的。
应用场景
场景一:支付的通知
生产者:微信支付完成之后在其回调方法中调用一个服务接收消息,这个服务作为生产者。
消费者:消费者服务是一个不断从队列中获取支付结果的应用,然后在app或者页面展示。
场景二:注册的短信或者邮件通知
生产者:注册成功之后的回调中,发送注册成功信息到队列生产者。
消费者:应用程序不断的获取队列中的消息,获取到就发送短信后者邮件。
image.png
应用 .Net Core with RabbitMQ
- Erlang运行环境,并配置环境变量 ERLANG_HOME:D:\Program Files\erl10.3
- RabbitMQ-win64版,并配置环境变量 RABBITMQ_SERVER:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.13,增加path变量 %RABBITMQ_SERVER%\sbin
-
简单配置后,我们的RabbitMQ已经可以正常使用了,开始菜单找到并管理员启动
RabbitMQ Command Prompt
-
键入以下命令,启动服务(服务默认是启动的)
rabbitmq-service install
rabbitmq-service enable
rabbitmq-service start
- 查看服务状态和监听端口
rabbitmqctl status
rabbitmqctl status
- 增删改查用户
rabbitmqctl list_users
rabbitmqctl add_user admin admin
rabbitmqctl set_permissions admin ".*" ".*" ".*"
rabbitmqctl set_user_tags admin administrator
rabbitmqctl delete_user guest
rabbitmqctl change_password {username} {newpassowrd}
Run
3.1.消息的发送和接收
生产者
static void Main (string[] args) {
var factory = new ConnectionFactory ();
factory.HostName = "localhost";
factory.UserName = "admin";
factory.Password = "admin";
using (var connection = factory.CreateConnection ()) {
using (var channel = connection.CreateModel ()) {
channel.QueueDeclare ("hello", true, false, false, null);
string message = "Hello,RabbitMQ!";
//5. 构建byte消息数据包
var body = Encoding.UTF8.GetBytes (message);
//6. 发送数据包
channel.BasicPublish ("", "hello", null, body);
}
}
}
消费者
static void Main (string[] args) {
//1. 实例化连接工厂
var factory = new ConnectionFactory ();
factory.HostName = "localhost";
factory.UserName = "admin";
factory.Password = "admin";
//2. 建立连接
using (var connection = factory.CreateConnection ()) {
//3. 创建信道
using (var channel = connection.CreateModel ()) {
//4. 申明队列
channel.QueueDeclare ("hello", true, false, false, null);
//5. 构造消费者实例
var consumer = new EventingBasicConsumer (channel);
//6. 绑定消息接收后的事件委托
consumer.Received += (model, ea) => {
var body = ea.Body;
var message = Encoding.UTF8.GetString (body);
Console.WriteLine ("已接收: {0}", message);
};
//7. 启动消费者
channel.BasicConsume ("hello", false, consumer);
Console.ReadLine ();
}
}
}
运行结果
image.png
3.2 循环调度
3.3 消息确认
//6. 绑定消息接收后的事件委托
consumer.Received += (model, ea) => {
var body = ea.Body;
var message = Encoding.UTF8.GetString (body);
Console.WriteLine ("已接收: {0}", message);
channel.BasicAck (ea.DeliveryTag, false);//消息确认
};
channel.BasicConsume ("hello", false, consumer); //channel.basicConsume(QUEUE_NAME, autoAck, consumer);
消费者确认或者说消费者应答指的是RabbitMQ需要确认消息到底有没有被收到。在订阅消息的时候可以指定应答模式,当自动应答等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。
有10条消息在队列中 未应答10条消息 应答后,消息被消化
- 指定一次获取一条消息
channel.basicQos(1);此时如果没有应答的话,消费者将不再继续获取
注意:如果都没有手动应答,在没有指定获取消息的条数时,消费者可以获取所有消息,当指定时,只能获取指定条,下次就只能等待了,没法继续获取下一条了
- 手工拒绝
// requeue:重新入队列,false:直接丢弃,相当于告诉队列可以直接删除掉
channel.basicReject(envelope.getDeliveryTag(), false);
当前消息会被消耗掉
被拒绝后消息删除
3.4 消息持久化
消息交换机(exchange)和消息队列(queue)都是持久化的话,那么他们之间的绑定(Binding)也是持久化的。如果消息交换机和消息队列之间一个持久化、一个非持久化,那么就不允许绑定。
3.5 公平分发
关闭客户端后 27号消息丢失最后补上
4 Exchange
4.1 direct
只有这两个routingkey完全相同,exchange才会选择对应的binging进行消息路由。
* 将交换器与队列通过路由键绑定*/
channel.queueBind(QUEUR_NAME, EXCHANGE_NAME, ROUTING_KEY);
image.png
4.2 topic
此类型exchange和上面的direct类型差不多,但direct类型要求routingkey完全相等,这里的routingkey可以有通配符:'','#'.
其中''表示匹配一个单词, '#'则表示匹配没有或者多个单词
4.3 fanout
此exchange的路由规则很简单直接将消息路由到所有绑定的队列中,无须对消息的routingkey进行匹配操作。
channel.ExchangeDeclare("weilai","fanout");
image.png
4.4 header
此类型的exchange和以上三个都不一样,其路由的规则是根据header来判断,其中的header就是以下方法的arguments参数
channel.QueueBind(queue: "queue.A", exchange: "agreements", routingKey: string.Empty, arguments: aHeader);
网友评论