美文网首页工作生活
RabbitMQ入门学习系列(三).消息发送接收

RabbitMQ入门学习系列(三).消息发送接收

作者: homehe | 来源:发表于2019-07-04 16:03 被阅读0次

    快速阅读

    ​ 用Rabitmq的队列管理,以及如何保证消息在队列中不丢失。通过ack的消息确认和持久化进行操作。 以及Rabbit中如何用Web面板进行管理队列。消费者如何处理耗时的任务

    生产者代码

    创建链接=》创建信道=》声明队列 。连续生产10条消息供消费者消费

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
            for (var i = 0; i < 10; i++) //连续生产10条消息,让消费者消费
            {
                string message = "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);
    
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;
    
                channel.BasicPublish(exchange: "",
                                     routingKey: "hello",
                                     basicProperties: null,
                                     body: body);
                Console.WriteLine(" [x] Sent {0}", message);
            }
    
    
        }
    
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    } 
    

    消费者代码

    创建链接=》创建信道=》声明队列 =>创建EventingBasicConsumer=》接收消息进行处理。

    如果挂断,消息会丢失。

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                     durable: false, exclusive: false, autoDelete: false, arguments: null);
    
                //以下是区别生产者的
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (sender, e) =>
                {
                    var body = e.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine("Received {0}", message);
                    Thread.Sleep(3000);//模拟耗时任务 ,
                    Console.WriteLine("Received over");
    
                };
                channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
                Console.WriteLine("");
                Console.ReadLine();
            }
    
        }
    
    }
    

    测试结果

    image

    从中我们可以看到,消费者每3秒消费一个任务 。

    消息确认

    如果一个消费者挂掉以后,怎么办呢?

    正常逻辑是RabbitMq把消费发送给消费者以后,会把消费从队列中删除 。

    但是如果消费者挂掉以后怎么办呢?因为这个时候消息已经发送出去,

    假如这个消息 在被消费者处理前挂掉了,我们就会丢失这个消费,

    为了避免这种问题的出现, 我们要用到消息确认机制,**就是当消费者处理完消息以后,再给rabbitmq一个确认信息,告诉他我已经处理好了,你可以删除了,RabbitMQ接收到以后,会从队列中把这个消息删除, 这就保证了消息会不会因消费者挂掉而丢失没有处理的消息 **。 **如果Rabbit没有接收到消息确认的通知(在超时之前) ,则会把这个消息再放到队列中,发送给另外的消费者。****

    我们把你代码改一下

    消费者代码中,加入ack发送的标志

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (sender, e) =>
    {
        var body = e.Body;
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine("Received {0}", message);
        Thread.Sleep(3000);//模拟耗时任务 ,
        Console.WriteLine("Received over");
        channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
    };
    

    发送者代码中加入发送的消息标识

     for (var i = 0; i < 10; i++)
     {
         string message = "Hello World!this is message "+i;
         var body = Encoding.UTF8.GetBytes(message);
         var properties = channel.CreateBasicProperties();
         properties.Persistent = true;
    
         channel.BasicPublish(exchange: "",
                              routingKey: "hello",
                              basicProperties: null,
                              body: body);
         Console.WriteLine(" [x] Sent {0},id={1}", message,i);
         Thread.Sleep(1000);
     }
    

    启动了三个消费者进程 ,但是发现队列中的任务 没有被消费完

    image

    还有id为6,7,8,9没有被消费, 这个时候是再重启一个消费者才可以消费完。

    image

    有点奇怪了。先放这里吧,做一个问题记录一下

    =》更新下进展

    晚上的时候查了一下。

    经常测试发现 要把autoAck设置为false才可以。

    channel.BasicConsume(queue: "HelloDurable1", autoAck: false, consumer: consumer);  //这个是正常的
    channel.BasicConsume(queue: "HelloDurable1", autoAck: true, consumer: consumer); //这个只能消费一部分,还需要重启才可以再消费
    
    • 经查autoAck 是否自动确认消息,true自动确认,false 不自动要手动调用,建立设置为false

    启动三个消费者测试发现正常 。

    1562162168225

    消息持久性

    我们还需要考虑到当RabbitMq.server挂掉的时候,消息也会丢失。

    为了避免此类问题:需要把消息和队列都标识为持久性。

    当我们标识为以后,重启程序时,发现报错了。

    image

    根据提示可以看出, 队列hello先前没有被标记为持久化,但已经存在了,我们不能改变他的属性,

    我们可以新建一个新的队列 。比如HelloDurable,就可以了。

    生产者和消费者两端都要修改。

    或者打开Rabbitmq的监控把队列进行删除

    RabbitMq监控

    先开始管理程序

    C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin>rabbitmq-plugins en
    able rabbitmq_management
    

    [图片上传失败...(image-c6edc9-1562227373030)]

    查看安装

    C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin>rabbitmq-plugins.bat list
    

    [图片上传失败...(image-b4f05f-1562227373030)]

    输入管理面板地址

    http://127.0.0.1:15672/

    用户名:guest ;密码 guest

    [图片上传失败...(image-3df944-1562227373030)]

    登陆进去以后,找到队列列表,删除相应的队列就可以了。

    1562162380181 1562162391890

    队列持久化的声明

     channel.QueueDeclare(queue: "HelloDurable",
                                         durable: true,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
    

    消费持久化的声明

    var properties = channel.CreateBasicProperties();
     properties.Persistent = true;
    

    这样即使服务器重启消息也不会丢失的。

    消息负载均衡

    为了避免有些消费者不能获得资源,有些消费者获得资源过多的情况,我们要做如下配置

    在消费者代码中增加

    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    

    表示每次取一个消息。

    通过使用消息确认标识和配置消息持久性,让我们的消息可以持久化和不会被丢失。

    友情提示

    我对我的文章负责,发现好多网上的文章 没有实践,都发出来的,让人走很多弯路,如果你在我的文章中遇到无法实现,或者无法走通的问题。可以直接在公众号《爱码农爱生活 》留言。必定会再次复查原因。让每一篇 文章都能顺利实现。道理讲明白 。原理讲清楚。代码必实现

    相关文章

      网友评论

        本文标题:RabbitMQ入门学习系列(三).消息发送接收

        本文链接:https://www.haomeiwen.com/subject/lwjrhctx.html