美文网首页
C# & RabbitMQ 之 Work Queues

C# & RabbitMQ 之 Work Queues

作者: SMILE_NO_09 | 来源:发表于2018-03-02 13:23 被阅读112次

    Work Queues介绍

    Work Queues简而言之就是Producer将Message发送到Queues中,公平调度的发送到个个worker处理。

    Work Queues模型
    值得注意的地方,在消息接受过程中,worker会遇到异常而崩溃,导致接收到的消息处理失败,但是Queues发送Message并不知道这个是否已经正确处理而自动删除这条message。这样会导致Message的丢失,所以需要实现手动Message acknowledgment。当处理成功是告知RabbitMQ 这条message处理OK并删除。
    除此之外还有一个Message的丢失风险,就是当RabbitMQ 退出或者异常崩溃时,会导致queue和message的丢失,所以也要配置Message durability(持久化)。
    公平调度(Fair dispatch),RabbitMQ默认是平均分配message到各个worker。为防止出现某些worker因为处理比较复杂,大量的数据而一直处理繁忙状态,其他的worker却处于闲置状态,还不停的进行调度繁忙的worker,需要使用basicQos 方法设置 prefetchCount = 1 ,就是告知RabbitMQ 不要同时的给一个worker大于1条Message。
    Fair dispatch

    演示代码

    producer:

    static void Main(string[] args)
            {
                //参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
    
                Console.WriteLine("********************producer***************");
                Console.WriteLine("please Input send message:");
                //连接到RabbitMQ
    
                var factory = new RabbitMQ.Client.ConnectionFactory();
                //第一种方式
                factory.UserName = "admin";
                factory.Password = "admin";
                factory.VirtualHost = "/";
                factory.HostName = "10.19.52.80";
    
                //第二种方式
                //factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
                //产生一个连接对象
                using (var conncetion = factory.CreateConnection())
                {
                    //通过conncetion产生一个连接通道
                    using (var channel = conncetion.CreateModel())
                    {
                        //用代码实现 exchanges和Queues 
                        //定义exchanges
                        string exchangeName = "Ewrokqueues";
                        channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
                        //定义Queues
                        string queueName = "Qwrokqueues";
                        bool durable = true;//设RabbitMQ置持久化
                        channel.QueueDeclare(queueName, durable, false, false, null);
                        //绑定exchanges 和Queues
                        string routingKey = "task_queue";
                        channel.QueueBind(queueName, exchangeName, "", null);
    
                        //简单设置队列方式
                        //channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false,
                        //    arguments: null);
    
                          for (int i =0;i<20;i++)
                          {
                              string message = i.ToString();
    
                                var body = Encoding.UTF8.GetBytes(message);
                                var properties = channel.CreateBasicProperties();
                                properties.Persistent = true;
                                channel.BasicPublish(exchange: "Ewrokqueues", routingKey: "", mandatory: false, basicProperties: properties, body: body);
                                Console.WriteLine("[producer] send : {0}", message);
                              Thread.Sleep(1000);
                            }
                    }
                }
                Console.ReadLine();
            }
    

    worker 代码:

    static void Main(string[] args)
           {
               //参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
    
               Console.WriteLine("********************worker1(sleep 5s)***************");
               //连接MQ
               var factory = new ConnectionFactory();
               factory.UserName = "admin";
               factory.Password = "admin";
               factory.VirtualHost = "/";
               factory.HostName = "10.19.52.80";
    
               //产生连接对象
               using (var connection = factory.CreateConnection())
               {
                   //通道
                   using (var channel = connection.CreateModel())
                   {
                       //公平调用
                       channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);
    
                       //订阅方式获取message
                       var consumer = new EventingBasicConsumer(channel);
                       //实现获取message处理事件
                       consumer.Received += (model, ea) =>
                       {
                           var body = ea.Body;
                           var message = Encoding.UTF8.GetString(body);
                           //睡眠5s 另一个是1s
                           Thread.Sleep(5000);
    
                           Console.WriteLine("[worker1] received : {0}", message);
    
                           //手动设置回复
                           channel.BasicAck(deliveryTag:ea.DeliveryTag,multiple:false);
                       };
                       //设置手动回复认证
                       channel.BasicConsume(queue: "Qwrokqueues", autoAck: false, consumer: consumer);
                       Console.ReadLine();
                   }
               }
           }
    

    P会循环发送20次,每秒发送一个数字到queue中,两个worker接受message。最后从运行结果可以看到整个分配情况,worker1第一个接受到“0”,在5秒处理完成后才接受“5”,而worker2会一直在处理,而不是出于等待闲置。


    运行结果

    相关文章

      网友评论

          本文标题:C# & RabbitMQ 之 Work Queues

          本文链接:https://www.haomeiwen.com/subject/qeygxftx.html