一、前言
为了解藕下发指令功能,加入了 RabbitMQ 中间件,目的很简单,就是使用独立出来的消息中间件,使得两端的应用互不影响,你重启你的,我发送我的。
根据此应用场景,对组件功能的需求也比较简单。考虑到类似的简单场景也具备一定的通用性,于是进行简易封装。
期望的外部方法主要有:
- 指定队列名称发送消息
- 消息到达可指定处理方法
这里我们使用默认的 exchange,也即不涉及这个概念。
引用程序集,如:rabbitmq-dotnet-client-3.6.6-dotnet-4.5.zip
https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/
二、先看使用方式
为了简洁,这里在同一个控制台上完成同一队列的发送与接收,手动输入发送内容,接收显示发送的内容。
(1)形成通话对象
参数如其名,将相应的信息作为参数传递即可。
RabbitMQQueueTalker rmt = new RabbitMQQueueTalker(
host: "localhost",
username: "guest",
password: "guest",
sendQueueName: "QueueTalk",
receiveQueueName: "QueueTalk",
durable: false);
(2)绑定消息响应方法
// 消息接收响应
rmt.OnMessage(s => {
Console.WriteLine(String.Format("receive message : {0}", s));
});
(3)发送消息
rmt.SendMessage(message);
完整调用过程
class Program
{
static void Main(string[] args)
{
RabbitMQQueueTalker rmt = new RabbitMQQueueTalker(
host: "localhost",
username: "guest",
password: "guest",
sendQueueName: "QueueTalk",
receiveQueueName: "QueueTalk",
durable: false);
// 消息接收响应
rmt.OnMessage(s => {
Console.WriteLine(String.Format("receive message : {0}", s));
});
// 输入并发送消息
while (true)
{
// input message
string message = Console.ReadLine();
if (rmt.SendMessage(message))
{
Console.WriteLine("send message :{0}", message);
}
}
}
}
运行结果如下:
三、封装实现
3.1 启动连接
应用需要连接能够断线重连,这里定义 GetConnection 方法,实现在未建立时新建连接,建立之后则由断线重连属性 AutomaticRecoveryEnabled
完成断线之后的恢复。
private IConnection GetConnection()
{
if (_Connection != null) return _Connection;
var factory = new ConnectionFactory()
{
HostName = this.Host,
UserName = this.UserName,
Password = this.Password,
RequestedHeartbeat = 10,
AutomaticRecoveryEnabled = true
};
try
{
factory.RequestedConnectionTimeout = 6000;
_Connection = factory.CreateConnection();
// 阻塞解除之后检测接收通道是否还打开
_Connection.ConnectionUnblocked += (o, e) => {
BuildReceiveChannel();
};
return _Connection;
}
catch (Exception se)
{
Debug.WriteLine(se.Message);
Debug.WriteLine(se.StackTrace);
return null;
}
}
3.2 发送通道与发送方法
默认队列,在通道建立的过程中进行队列声明 QueueDeclare
。
// 获取发送通道
private IModel GetSendChannel()
{
if (_SendChannel != null && !_SendChannel.IsClosed) return _SendChannel;
var conn = GetConnection();
if (conn == null) return null;
try
{
_SendChannel = conn.CreateModel();
_SendChannel.QueueDeclare(queue: SendQueueName,
durable: Durable,
exclusive: false,
autoDelete: false,
arguments: null);
return _SendChannel;
}
catch (Exception se)
{
Debug.WriteLine(se.Message);
Debug.WriteLine(se.StackTrace);
return null;
}
}
对于临时指定队列名称的,则在方法中动态进行队列声明 QueueDeclare
。开放的方法如下。
/// <summary>
/// 向默认发送队列发送消息
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public bool SendMessage(string message)
{
return SendMessage(SendQueueName, message);
}
/// <summary>
/// 向指定队列发送消息
/// </summary>
/// <param name="queueName"></param>
/// <param name="message"></param>
/// <returns></returns>
public bool SendMessage(string queueName, string message)
{
var channel = GetSendChannel();
if (channel == null) return false;
try
{
if (SendQueueName != queueName)
{
channel.QueueDeclare(queue: queueName,
durable: Durable,
exclusive: false,
autoDelete: false,
arguments: null);
}
var body = Encoding.UTF8.GetBytes(message);
// 发送消息
channel.BasicPublish(exchange: "",
routingKey: queueName,
basicProperties: null,
body: body);
return true;
}
catch (Exception se)
{
Debug.WriteLine(se.Message);
Debug.WriteLine(se.StackTrace);
return false;
}
}
3.3 接收通道与接收方式
定义响应方法列表,用于在消息事件中逐个调用。
private List<Action<string>> ReceiveActionList = new List<Action<string>>();
建立接收通道,并绑定接收事件。
// 建立接收通道
private IModel BuildReceiveChannel()
{
if (_ReceiveChannel != null && !_ReceiveChannel.IsClosed) return _ReceiveChannel;
var conn = GetConnection();
if (conn == null) return null;
try
{
_ReceiveChannel = conn.CreateModel();
_ReceiveChannel.QueueDeclare(queue: ReceiveQueueName,
durable: Durable,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(_ReceiveChannel);
// 绑定消息事件
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
foreach (Action<string> action in ReceiveActionList)
{
action(message);
}
};
// 启动消费者
_ReceiveChannel.BasicConsume(queue: ReceiveQueueName,
noAck: true,
consumer: consumer);
return _ReceiveChannel;
}
catch (Exception se)
{
Debug.WriteLine(se.Message);
Debug.WriteLine(se.StackTrace);
return null;
}
}
向外开放添加响应方法的功能。
/// <summary>
/// 添加消息到达响应方法
/// </summary>
/// <param name="action"></param>
public void OnMessage(Action<string> action)
{
lock (ReceiveActionList)
{
ReceiveActionList.Add(action);
}
BuildReceiveChannel();
}
详细源码
https://github.com/triplestudio/helloworld/blob/master/RabbitMQTest/RabbitMQQueueTalker.cs
网友评论