细说RabbitMQ

作者: peipeicn | 来源:发表于2018-05-03 12:31 被阅读17次
    RabbitMQ可以做什么?

    AMQP,即Advanced Message Queuing Protocol,高级消息队列协议。是应用层协议的一个开放标准,为面向消息的中间件设计。
    消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

    应用场景

    我们知道现在很多APP都可以推送。在管理员选定内容进行推送这个行为中。
    系统往往需要执行俩个操作:推送消息;记录是谁在什么时候推送的。
    但是对于管理员来说,他只关心推送完成了没有,而不关心是否产生了日志。

    传统的做法有2种:
    串行:推送消息,然后在记录日志,在俩个操作都完成之后告诉管理员推送完成。
    并行:推送消息的同时记录日志,在俩个操作都完成之后告诉管理员推送完成。

    OK。我们现在引入消息队列。
    引入消息队列之后,我们只需要在推送,然后日志放入消息队列中。然后就可以告诉管理员消息推送完成。
    而日志信息会存放在消息队列之中。消息队列的消费者会在系统不繁忙的时候进行处理。

    RabbitMQ术语
    生产者:

    消息发送者

    消费者:

    等待消息的程序

    Queue:

    队列,存放消息的

    Simple

    RabbitMQ安装即简单使用

    RabbitMQ四种exchange

    exchange

    如果发生紧急情况,我们的服务器宕掉的话,消息队列里的信息没了怎么办?

    消息持久化

    RabbitMQ为我们提供了消息持久化的手段
    首先是队列持久化,然后在是消息持久化

    如果消费者在消费当前消息的时候,突然崩掉,那么这条消息还在消息队列中吗?还是已经被消费掉了?

    消息响应机制

    RabbitMQ为我们提供了消息响应机制

    啰里啰唆半天,下边是代码。
    .net版本。

    //生产者
    var factory = new ConnectionFactory(){ HostName = "localhost", UserName = "guest", Password = "" };
    
    using (IConnection conn = factory.CreateConnection())
    {
           using (IModel im = conn.CreateModel())
           {
                  im.ExchangeDeclare("rabbitmq_route", ExchangeType.Direct);
                  im.QueueDeclare("rabbitmq_query", true, false, false, null);//第二个参数队列持久化
                 
                  im.QueueBind("rabbitmq_query", "rabbitmq_route", ExchangeType.Direct, null);
                  for (int i = 0; i < 5; i++)
                  {
                       var props = im.CreateBasicProperties();
                       props.SetPersistent(true);//消息持久化
                       byte[] message = Encoding.UTF8.GetBytes("Hello " + i);
                       im.BasicPublish("rabbitmq_route", ExchangeType.Direct, props, message);
                       Console.WriteLine("send:" + i);
                  }
           }
    }
    //消费者
    var factory = new ConnectionFactory(){ HostName = "localhost", UserName = "guest", Password = "" };
    using (IConnection conn = factory.CreateConnection())
    {
           using (IModel im = conn.CreateModel())
           {
                  while (true)
                  {
                            
                       BasicGetResult res = channel.BasicGet("rabbitmq_query", false);
                       if (res != null)
                       {
                            Console.WriteLine("receiver:" + UTF8Encoding.UTF8.GetString(res.Body));
                       }
                      Thread.Sleep(5000);
                      channel.BasicAck(res.DeliveryTag, true);//消息响应
                      Console.WriteLine("basiack end");          
                  }
           }
    }
    
    //using (IConnection conn = factory.CreateConnection())
                //{
                //    using (IModel im = conn.CreateModel())
                //    {
                //        im.ExchangeDeclare("rabbitmq_route_Fanout", ExchangeType.Fanout);// 路由
                //        int i = 0;
                //        while (true)
                //        {
                //            Thread.Sleep(1000);
                //            ++i;
                //            byte[] message = Encoding.UTF8.GetBytes(i.ToString());
                //            im.BasicPublish("rabbitmq_route_Fanout", "", null, message);
                //            Console.WriteLine("send:" + i.ToString());
                //        }
                //    }
                //}
    
     //using (IConnection conn = factory.CreateConnection())
                //{
                //    using (IModel im = conn.CreateModel())
                //    {
                //        im.ExchangeDeclare("rabbitmq_route_Fanout", ExchangeType.Fanout);
                //        var queueOk = im.QueueDeclare();//1
                //        im.QueueBind(queueOk.QueueName, "rabbitmq_route_Fanout", "");//2
                //        var consumer = new QueueingBasicConsumer(im);//3
                //        im.BasicConsume(queueOk.QueueName, true, consumer);//4
                //        while (true)
                //        {
                //            var _result = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//5
                //            var body = _result.Body;
                //            var message = Encoding.UTF8.GetString(body);
                //            Console.WriteLine("received:{0}", message);
                //        }
                //    }
                //}
    

    RabbitMQ.Client.dil 版本5.1.0-rc1

    相关文章

      网友评论

        本文标题:细说RabbitMQ

        本文链接:https://www.haomeiwen.com/subject/puscrftx.html