美文网首页
C# & RabbitMQ 之 Exchanges

C# & RabbitMQ 之 Exchanges

作者: SMILE_NO_09 | 来源:发表于2018-03-15 16:34 被阅读208次

    Exchanges

    在前面的学习中其实已经接触了exchanges,通常情况下,producer 不需要知道发送消息给哪一个queue,只需要发送messages给exchange就足以,exchange一端接受producers的messages,另一端push message到queues。为了准确的处理messages,定义了exchange type:direct, topic, headers ,fanout。

    Fanout Exchange

    fanout exchange将messages广播到到已经绑定了的queues中。每个queue都会收到producer发送的信息。


    fanout exchange

    fanout exchange代码:
    producer:

    static void Main(string[] args)
            {
                //参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
    
                Console.WriteLine("********************fanout producer***************");
                Console.WriteLine("please Input send message:");
                //连接到RabbitMQ
    
                var factory = new RabbitMQ.Client.ConnectionFactory();
                //第一种方式
                factory.UserName = "admin";
                factory.Password = "admin";
                factory.VirtualHost = "/";
                factory.HostName = "10.19.52.80";
    
                //第二种方式
                //factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
                //产生一个连接对象
                using (var conncetion = factory.CreateConnection())
                {
                    //通过conncetion产生一个连接通道
                    using (var channel = conncetion.CreateModel())
                    {
                        //用代码实现 exchanges和Queues 
                        //定义exchanges
                        string exchangeName = "Efanout_test";
                        //设置类型 Fanout
                        channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
                        //定义Queues
                        string queueName1 = "qfanout_test1";
                        string queueName2 = "qfanout_test2";
    
                        bool durable = true;//设RabbitMQ置持久化
    
                        channel.QueueDeclare(queueName1, durable, false, false, null);
                        channel.QueueDeclare(queueName2, durable, false, false, null);
                        //绑定 queue 与exchange
                        channel.QueueBind(queueName1, exchangeName, "", null);
                        channel.QueueBind(queueName2, exchangeName, "", null);
    
                        for (int i = 0; i < 20; i++)
                        {
                            string message = i.ToString();
    
                            var body = Encoding.UTF8.GetBytes(message);
                            var properties = channel.CreateBasicProperties();
                            properties.Persistent = true;
                            channel.BasicPublish(exchange: "Efanout_test", routingKey: "", mandatory: false, basicProperties: properties, body: body);
                            Console.WriteLine("[producer] send : {0}", message);
                            Thread.Sleep(1000);
                        }
                    }
                }
                Console.ReadLine();
            }
    

    接受端代码:

      static void Main(string[] args)
            {
                //参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
    
                Console.WriteLine("********************fanout c1***************");
                //连接MQ
                var factory = new ConnectionFactory();
                factory.UserName = "admin";
                factory.Password = "admin";
                factory.VirtualHost = "/";
                factory.HostName = "10.19.52.80";
    
                //产生连接对象
                using (var connection = factory.CreateConnection())
                {
                    //通道
                    using (var channel = connection.CreateModel())
                    {
                        //公平调用
                        //channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);
    
                        //订阅方式获取message
                        var consumer = new EventingBasicConsumer(channel);
                        //实现获取message处理事件
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
    
    
                            Console.WriteLine("[qfanout_test1] received : {0}", message);
    
                            //手动设置回复
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        };
                        //设置手动回复认证 接受队列名称
                        channel.BasicConsume(queue: "qfanout_test1", autoAck: false, consumer: consumer);
                        //另一个的参数
                        //channel.BasicConsume(queue: "qfanout_test2", autoAck: false, consumer: consumer);
                        Console.ReadLine();
                    }
                }
            }
    

    web管理界面绑定关系:


    绑定关系

    运行结果:


    fanout exchange

    Direct exchange

    前面的fanout 类型中,只要exchange与queue绑定,message 发送给所有与exchange有绑定关系的queue中,但是有时候不是我们只希望传递message到某些queue中时就需要用到direct exchange,在QueueBind时添加routing key来实现。


    Direct exchange

    代码实现:
    Direct_P:

    static void Main(string[] args)
            {
                //参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
    
                Console.WriteLine("********************Direct P***************");
                Console.WriteLine("please Input send message:");
                //连接到RabbitMQ
    
                var factory = new RabbitMQ.Client.ConnectionFactory();
                //第一种方式
                factory.UserName = "admin";
                factory.Password = "admin";
                factory.VirtualHost = "/";
                factory.HostName = "10.19.52.80";
    
                //第二种方式
                //factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
                //产生一个连接对象
                using (var conncetion = factory.CreateConnection())
                {
                    //通过conncetion产生一个连接通道
                    using (var channel = conncetion.CreateModel())
                    {
                        //用代码实现 exchanges和Queues 
                        //定义exchanges
                        string exchangeName = "EDirect_test";
                        //设置类型 Fanout
                        channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
                        //定义Queues
                        string queueName1 = "qDirect_test1";
                        string queueName2 = "qDirect_test2";
                        bool durable = true;//设RabbitMQ置持久化
    
    
                        channel.QueueDeclare(queueName1, durable, false, false, null);
                        channel.QueueDeclare(queueName2, durable, false, false, null);
                        //绑定 queue 与exchange
                        //routingkey info  waring error 
                        channel.QueueBind(queueName1, exchangeName, "info", null);
                        channel.QueueBind(queueName2, exchangeName, "error", null);
                        channel.QueueBind(queueName2, exchangeName, "waring", null);
    
    
                        for (int i = 0; i < 20; i++)
                        {
                            string message = null;
    
                            var properties = channel.CreateBasicProperties();
                            properties.Persistent = true;
                            string routingkey = null;
    
                            if (i%3 == 0)
                            {
                                message = "error";
                                routingkey = "error";
                            }
                            else if (i%3 == 1)
                            {
                                message = "waring";
                                routingkey = "waring";
                            }
                            else
                            {
                                message = "info";
                                routingkey = "info";
                            }
    
                            var body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish( "EDirect_test", routingkey,  false, properties,  body);
                            Console.WriteLine("[Direct P] send : {0}", message);
                            Thread.Sleep(1000);
                        }
                    }
                }
                Console.ReadLine();
            }
    

    Direct_C:

    static void Main(string[] args)
            {
                //参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
    
                Console.WriteLine("********************Direct_C2***************");
                //连接MQ
                var factory = new ConnectionFactory();
                factory.UserName = "admin";
                factory.Password = "admin";
                factory.VirtualHost = "/";
                factory.HostName = "10.19.52.80";
    
                //产生连接对象
                using (var connection = factory.CreateConnection())
                {
                    //通道
                    using (var channel = connection.CreateModel())
                    {
                        //公平调用
                        //channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);
    
                        //订阅方式获取message
                        var consumer = new EventingBasicConsumer(channel);
                        //实现获取message处理事件
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            var routingKey = ea.RoutingKey;
                            Console.WriteLine("[Direct_C2] received : {0}--routingkey {1}", message,routingKey);
    
                            //手动设置回复
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        };
                        //设置手动回复认证
                        channel.BasicConsume(queue: "qDirect_test2", autoAck: false, consumer: consumer);
                        Console.ReadLine();
                    }
                }
            }
    

    代码实现的结构如图:



    运行结果:


    Direct exchange实现效果
    Web后台绑定关系:
    后台绑定关系

    Topic exchange

    简单的理解Topic exchange是在Direct exchange上的扩展,routing_key不局限于完全匹配,而是像一种正则的样子去匹配。不过只有两个特殊通配符:
    * 号用来匹配一个单词,比如"quick.orange.rabbit" 就可以用*.orange. 匹配到*
    #号用来匹配0到多个单词,比如“lazy.orange.male.rabbit”可以用lazy.# 匹配

    Topic exchange
    ETopic_P.cs
    static void Main(string[] args)
            {
                //参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
    
                Console.WriteLine("********************ETopic P***************");
                Console.WriteLine("please Input send message:");
                //连接到RabbitMQ
    
                var factory = new RabbitMQ.Client.ConnectionFactory();
                //第一种方式
                factory.UserName = "admin";
                factory.Password = "admin";
                factory.VirtualHost = "/";
                factory.HostName = "10.19.52.80";
    
                //第二种方式
                //factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
                //产生一个连接对象
                using (var conncetion = factory.CreateConnection())
                {
                    //通过conncetion产生一个连接通道
                    using (var channel = conncetion.CreateModel())
                    {
                        //用代码实现 exchanges和Queues 
                        //定义exchanges
                        string exchangeName = "ETopic_test";
                        //设置类型 Fanout
                        channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
                        //定义Queues
                        string queueName1 = "qTopic_test1";
                        string queueName2 = "qTopic_test2";
                        string queueName3 = "qTopic_test3";
                        bool durable = true;//设RabbitMQ置持久化
    
    
                        channel.QueueDeclare(queueName1, durable, false, false, null);
                        channel.QueueDeclare(queueName2, durable, false, false, null);
                        channel.QueueDeclare(queueName3, durable, false, false, null);
                        //绑定 queue 与exchange
                        //routingkey info  waring error 
                        channel.QueueBind(queueName1, exchangeName, "log.#", null);
                        channel.QueueBind(queueName2, exchangeName, "*.error", null);
                        channel.QueueBind(queueName3, exchangeName, "*.waring", null);
    
                        for (int i = 0; i < 20; i++)
                        {
                            string message = null;
    
                            //var properties = channel.CreateBasicProperties();
                            //properties.Persistent = true;
                            string routingkey = null;
    
                            if (i % 3 == 0)
                            {
                                message = "error";
                                routingkey = "log.error";
                            }
                            else if (i % 3 == 1)
                            {
                                message = "waring";
                                routingkey = "log.waring";
                            }
                            else
                            {
                                message = "info";
                                routingkey = "log.waring.error";
                            }
    
                            var body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish("ETopic_test", routingkey, false, null, body);
                            Console.WriteLine("[ETopic P] send : {0}", message);
                            Thread.Sleep(1000);
                        }
                    }
                }
                Console.ReadLine();
            }
    

    ETopic_C.cs

    static void Main(string[] args)
            {
                //参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
    
                Console.WriteLine("********************Topic_C3***************");
                //连接MQ
                var factory = new ConnectionFactory();
                factory.UserName = "admin";
                factory.Password = "admin";
                factory.VirtualHost = "/";
                factory.HostName = "10.19.52.80";
    
                //产生连接对象
                using (var connection = factory.CreateConnection())
                {
                    //通道
                    using (var channel = connection.CreateModel())
                    {
                        //公平调用
                        //channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);
    
                        //订阅方式获取message
                        var consumer = new EventingBasicConsumer(channel);
                        //实现获取message处理事件
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            var routingKey = ea.RoutingKey;
                            Console.WriteLine("[Topic_C3] received : {0}--routingkey {1}", message, routingKey);
    
                            //手动设置回复
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        };
                        //设置手动回复认证
                        channel.BasicConsume(queue: "qTopic_test3", autoAck: false, consumer: consumer);
                        Console.ReadLine();
                    }
                }
            }
    
    Topic exchange运行结果
    Web后台绑定关系

    相关文章

      网友评论

          本文标题:C# & RabbitMQ 之 Exchanges

          本文链接:https://www.haomeiwen.com/subject/fflaxftx.html