使用C#编写RabbitMQ的consumer时,需要注意,如果侦听多个队列,需要确保每个侦听线程都处在运行状态,下面是示例代码:
using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;
using Newtonsoft.Json;
using RabbitMQClient;
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "admin",
Password = "admin",
VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "PlanInputQueue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueDeclare(queue: "PlanApproveQueue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += async (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine("收到InputPlan {0}", message);
var res=await Utility.SendPlan(message);
if(string.IsNullOrEmpty(res)) res="发送InputPlan到Approve成功:"+message;
Console.WriteLine(res);
res=await Utility.SendInputToQuery(message);
if(string.IsNullOrEmpty(res)) res="发送InputPlan到Query成功:"+message;
Console.WriteLine(res);
};
channel.BasicConsume(queue: "PlanInputQueue",
autoAck: true,
consumer: consumer);
var consumer1 = new EventingBasicConsumer(channel);
consumer1.Received += async (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine("收到ApprovePlan {0}", message);
var res=await Utility.SendApproveToQuery(message);
if(string.IsNullOrEmpty(res)) res="发送ApprovePlan到Query成功:"+message;
Console.WriteLine(res);
};
channel.BasicConsume(queue: "PlanApproveQueue",
autoAck: true,
consumer: consumer1);
Console.WriteLine(" 按回车退出");
Console.ReadLine();
};
上面的代码中,Console.ReadLine()确保程序不退出,这样,两个cunsumer可以接收到数据,如果把这句话放到using connection范围之外,就无法接收到消息,因为定义完成后,connection和channel就已经执行完成。
网友评论