美文网首页
使用RabbitMQ

使用RabbitMQ

作者: 米来MiLai | 来源:发表于2019-03-28 15:07 被阅读0次

    概念

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang框架上的。

    应用场景

    场景一:支付的通知
    生产者:微信支付完成之后在其回调方法中调用一个服务接收消息,这个服务作为生产者。
    消费者:消费者服务是一个不断从队列中获取支付结果的应用,然后在app或者页面展示。
    场景二:注册的短信或者邮件通知
    生产者:注册成功之后的回调中,发送注册成功信息到队列生产者。
    消费者:应用程序不断的获取队列中的消息,获取到就发送短信后者邮件。


    image.png

    应用 .Net Core with RabbitMQ

    1. Erlang运行环境,并配置环境变量 ERLANG_HOME:D:\Program Files\erl10.3
    2. RabbitMQ-win64版,并配置环境变量 RABBITMQ_SERVER:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.13,增加path变量 %RABBITMQ_SERVER%\sbin
    • 简单配置后,我们的RabbitMQ已经可以正常使用了,开始菜单找到并管理员启动

      RabbitMQ Command Prompt
    • 键入以下命令,启动服务(服务默认是启动的)

    rabbitmq-service install
    rabbitmq-service enable
    rabbitmq-service start
    
    • 查看服务状态和监听端口
    rabbitmqctl status
    
    rabbitmqctl status
    • 增删改查用户
    rabbitmqctl list_users
    rabbitmqctl add_user  admin  admin
    rabbitmqctl set_permissions  admin  ".*"  ".*"  ".*"
    rabbitmqctl set_user_tags admin administrator
    rabbitmqctl delete_user guest
    rabbitmqctl change_password {username}  {newpassowrd}
    

    Run

    3.1.消息的发送和接收

    生产者
    static void Main (string[] args) {
          var factory = new ConnectionFactory ();
          factory.HostName = "localhost";
          factory.UserName = "admin";
          factory.Password = "admin";
    
          using (var connection = factory.CreateConnection ()) {
            using (var channel = connection.CreateModel ()) {
              channel.QueueDeclare ("hello", true, false, false, null);
              string message = "Hello,RabbitMQ!";
              //5. 构建byte消息数据包
              var body = Encoding.UTF8.GetBytes (message);
              //6. 发送数据包
              channel.BasicPublish ("", "hello", null, body);
            }
          }
        }
    
    消费者
    static void Main (string[] args) {
                //1. 实例化连接工厂
                var factory = new ConnectionFactory ();
                factory.HostName = "localhost";
                factory.UserName = "admin";
                factory.Password = "admin";
                //2. 建立连接
                using (var connection = factory.CreateConnection ()) {
                    //3. 创建信道
                    using (var channel = connection.CreateModel ()) {
                        //4. 申明队列
                        channel.QueueDeclare ("hello", true, false, false, null);
                        //5. 构造消费者实例
                        var consumer = new EventingBasicConsumer (channel);
                        //6. 绑定消息接收后的事件委托
                        consumer.Received += (model, ea) => {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString (body);
                            Console.WriteLine ("已接收: {0}", message);
                        };
                        //7. 启动消费者
                        channel.BasicConsume ("hello", false, consumer);
                        Console.ReadLine ();
                    }
                }
            }
    

    运行结果


    image.png

    3.2 循环调度

    3.3 消息确认

    //6. 绑定消息接收后的事件委托
    consumer.Received += (model, ea) => {
              var body = ea.Body;
              var message = Encoding.UTF8.GetString (body);
              Console.WriteLine ("已接收: {0}", message);
              channel.BasicAck (ea.DeliveryTag, false);//消息确认
              };
    channel.BasicConsume ("hello", false, consumer); //channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    

    消费者确认或者说消费者应答指的是RabbitMQ需要确认消息到底有没有被收到。在订阅消息的时候可以指定应答模式,当自动应答等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。


    有10条消息在队列中 未应答10条消息 应答后,消息被消化
    • 指定一次获取一条消息
    channel.basicQos(1);此时如果没有应答的话,消费者将不再继续获取
    

    注意:如果都没有手动应答,在没有指定获取消息的条数时,消费者可以获取所有消息,当指定时,只能获取指定条,下次就只能等待了,没法继续获取下一条了

    • 手工拒绝
    // requeue:重新入队列,false:直接丢弃,相当于告诉队列可以直接删除掉
    channel.basicReject(envelope.getDeliveryTag(), false);
    当前消息会被消耗掉
    
    被拒绝后消息删除

    3.4 消息持久化

    消息交换机(exchange)和消息队列(queue)都是持久化的话,那么他们之间的绑定(Binding)也是持久化的。如果消息交换机和消息队列之间一个持久化、一个非持久化,那么就不允许绑定。

    3.5 公平分发

    关闭客户端后 27号消息丢失
    最后补上

    4 Exchange

    4.1 direct

    只有这两个routingkey完全相同,exchange才会选择对应的binging进行消息路由。

    * 将交换器与队列通过路由键绑定*/
     channel.queueBind(QUEUR_NAME, EXCHANGE_NAME, ROUTING_KEY);
    
    image.png

    4.2 topic

    此类型exchange和上面的direct类型差不多,但direct类型要求routingkey完全相等,这里的routingkey可以有通配符:'','#'.
    其中'
    '表示匹配一个单词, '#'则表示匹配没有或者多个单词

    image.png

    4.3 fanout

    此exchange的路由规则很简单直接将消息路由到所有绑定的队列中,无须对消息的routingkey进行匹配操作。
    channel.ExchangeDeclare("weilai","fanout");
    
    image.png

    4.4 header

    此类型的exchange和以上三个都不一样,其路由的规则是根据header来判断,其中的header就是以下方法的arguments参数
    channel.QueueBind(queue: "queue.A", exchange: "agreements", routingKey: string.Empty,  arguments: aHeader);
    

    相关文章

      网友评论

          本文标题:使用RabbitMQ

          本文链接:https://www.haomeiwen.com/subject/xeiqbqtx.html