Exchanges
在前面的学习中其实已经接触了exchanges,通常情况下,producer 不需要知道发送消息给哪一个queue,只需要发送messages给exchange就足以,exchange一端接受producers的messages,另一端push message到queues。为了准确的处理messages,定义了exchange type:direct, topic, headers ,fanout。
Fanout Exchange
fanout exchange将messages广播到到已经绑定了的queues中。每个queue都会收到producer发送的信息。
fanout exchange
fanout exchange代码:
producer:
static void Main(string[] args)
{
//参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
Console.WriteLine("********************fanout producer***************");
Console.WriteLine("please Input send message:");
//连接到RabbitMQ
var factory = new RabbitMQ.Client.ConnectionFactory();
//第一种方式
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "/";
factory.HostName = "10.19.52.80";
//第二种方式
//factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
//产生一个连接对象
using (var conncetion = factory.CreateConnection())
{
//通过conncetion产生一个连接通道
using (var channel = conncetion.CreateModel())
{
//用代码实现 exchanges和Queues
//定义exchanges
string exchangeName = "Efanout_test";
//设置类型 Fanout
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
//定义Queues
string queueName1 = "qfanout_test1";
string queueName2 = "qfanout_test2";
bool durable = true;//设RabbitMQ置持久化
channel.QueueDeclare(queueName1, durable, false, false, null);
channel.QueueDeclare(queueName2, durable, false, false, null);
//绑定 queue 与exchange
channel.QueueBind(queueName1, exchangeName, "", null);
channel.QueueBind(queueName2, exchangeName, "", null);
for (int i = 0; i < 20; i++)
{
string message = i.ToString();
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "Efanout_test", routingKey: "", mandatory: false, basicProperties: properties, body: body);
Console.WriteLine("[producer] send : {0}", message);
Thread.Sleep(1000);
}
}
}
Console.ReadLine();
}
接受端代码:
static void Main(string[] args)
{
//参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
Console.WriteLine("********************fanout c1***************");
//连接MQ
var factory = new ConnectionFactory();
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "/";
factory.HostName = "10.19.52.80";
//产生连接对象
using (var connection = factory.CreateConnection())
{
//通道
using (var channel = connection.CreateModel())
{
//公平调用
//channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);
//订阅方式获取message
var consumer = new EventingBasicConsumer(channel);
//实现获取message处理事件
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("[qfanout_test1] received : {0}", message);
//手动设置回复
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
//设置手动回复认证 接受队列名称
channel.BasicConsume(queue: "qfanout_test1", autoAck: false, consumer: consumer);
//另一个的参数
//channel.BasicConsume(queue: "qfanout_test2", autoAck: false, consumer: consumer);
Console.ReadLine();
}
}
}
web管理界面绑定关系:
绑定关系
运行结果:
fanout exchange
Direct exchange
前面的fanout 类型中,只要exchange与queue绑定,message 发送给所有与exchange有绑定关系的queue中,但是有时候不是我们只希望传递message到某些queue中时就需要用到direct exchange,在QueueBind时添加routing key来实现。
Direct exchange
代码实现:
Direct_P:
static void Main(string[] args)
{
//参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
Console.WriteLine("********************Direct P***************");
Console.WriteLine("please Input send message:");
//连接到RabbitMQ
var factory = new RabbitMQ.Client.ConnectionFactory();
//第一种方式
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "/";
factory.HostName = "10.19.52.80";
//第二种方式
//factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
//产生一个连接对象
using (var conncetion = factory.CreateConnection())
{
//通过conncetion产生一个连接通道
using (var channel = conncetion.CreateModel())
{
//用代码实现 exchanges和Queues
//定义exchanges
string exchangeName = "EDirect_test";
//设置类型 Fanout
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
//定义Queues
string queueName1 = "qDirect_test1";
string queueName2 = "qDirect_test2";
bool durable = true;//设RabbitMQ置持久化
channel.QueueDeclare(queueName1, durable, false, false, null);
channel.QueueDeclare(queueName2, durable, false, false, null);
//绑定 queue 与exchange
//routingkey info waring error
channel.QueueBind(queueName1, exchangeName, "info", null);
channel.QueueBind(queueName2, exchangeName, "error", null);
channel.QueueBind(queueName2, exchangeName, "waring", null);
for (int i = 0; i < 20; i++)
{
string message = null;
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
string routingkey = null;
if (i%3 == 0)
{
message = "error";
routingkey = "error";
}
else if (i%3 == 1)
{
message = "waring";
routingkey = "waring";
}
else
{
message = "info";
routingkey = "info";
}
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish( "EDirect_test", routingkey, false, properties, body);
Console.WriteLine("[Direct P] send : {0}", message);
Thread.Sleep(1000);
}
}
}
Console.ReadLine();
}
Direct_C:
static void Main(string[] args)
{
//参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
Console.WriteLine("********************Direct_C2***************");
//连接MQ
var factory = new ConnectionFactory();
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "/";
factory.HostName = "10.19.52.80";
//产生连接对象
using (var connection = factory.CreateConnection())
{
//通道
using (var channel = connection.CreateModel())
{
//公平调用
//channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);
//订阅方式获取message
var consumer = new EventingBasicConsumer(channel);
//实现获取message处理事件
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine("[Direct_C2] received : {0}--routingkey {1}", message,routingKey);
//手动设置回复
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
//设置手动回复认证
channel.BasicConsume(queue: "qDirect_test2", autoAck: false, consumer: consumer);
Console.ReadLine();
}
}
}
代码实现的结构如图:
运行结果:
Direct exchange实现效果
Web后台绑定关系:
后台绑定关系
Topic exchange
简单的理解Topic exchange是在Direct exchange上的扩展,routing_key不局限于完全匹配,而是像一种正则的样子去匹配。不过只有两个特殊通配符:
* 号用来匹配一个单词,比如"quick.orange.rabbit" 就可以用*.orange. 匹配到*
#号用来匹配0到多个单词,比如“lazy.orange.male.rabbit”可以用lazy.# 匹配
ETopic_P.cs
static void Main(string[] args)
{
//参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
Console.WriteLine("********************ETopic P***************");
Console.WriteLine("please Input send message:");
//连接到RabbitMQ
var factory = new RabbitMQ.Client.ConnectionFactory();
//第一种方式
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "/";
factory.HostName = "10.19.52.80";
//第二种方式
//factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
//产生一个连接对象
using (var conncetion = factory.CreateConnection())
{
//通过conncetion产生一个连接通道
using (var channel = conncetion.CreateModel())
{
//用代码实现 exchanges和Queues
//定义exchanges
string exchangeName = "ETopic_test";
//设置类型 Fanout
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
//定义Queues
string queueName1 = "qTopic_test1";
string queueName2 = "qTopic_test2";
string queueName3 = "qTopic_test3";
bool durable = true;//设RabbitMQ置持久化
channel.QueueDeclare(queueName1, durable, false, false, null);
channel.QueueDeclare(queueName2, durable, false, false, null);
channel.QueueDeclare(queueName3, durable, false, false, null);
//绑定 queue 与exchange
//routingkey info waring error
channel.QueueBind(queueName1, exchangeName, "log.#", null);
channel.QueueBind(queueName2, exchangeName, "*.error", null);
channel.QueueBind(queueName3, exchangeName, "*.waring", null);
for (int i = 0; i < 20; i++)
{
string message = null;
//var properties = channel.CreateBasicProperties();
//properties.Persistent = true;
string routingkey = null;
if (i % 3 == 0)
{
message = "error";
routingkey = "log.error";
}
else if (i % 3 == 1)
{
message = "waring";
routingkey = "log.waring";
}
else
{
message = "info";
routingkey = "log.waring.error";
}
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("ETopic_test", routingkey, false, null, body);
Console.WriteLine("[ETopic P] send : {0}", message);
Thread.Sleep(1000);
}
}
}
Console.ReadLine();
}
ETopic_C.cs
static void Main(string[] args)
{
//参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
Console.WriteLine("********************Topic_C3***************");
//连接MQ
var factory = new ConnectionFactory();
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "/";
factory.HostName = "10.19.52.80";
//产生连接对象
using (var connection = factory.CreateConnection())
{
//通道
using (var channel = connection.CreateModel())
{
//公平调用
//channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);
//订阅方式获取message
var consumer = new EventingBasicConsumer(channel);
//实现获取message处理事件
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine("[Topic_C3] received : {0}--routingkey {1}", message, routingKey);
//手动设置回复
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
//设置手动回复认证
channel.BasicConsume(queue: "qTopic_test3", autoAck: false, consumer: consumer);
Console.ReadLine();
}
}
}
Topic exchange运行结果
Web后台绑定关系
网友评论