美文网首页程序园技术初心简友广场
C# 简单操作 RabbitMQ 发送与接收队列消息

C# 简单操作 RabbitMQ 发送与接收队列消息

作者: triplestudio | 来源:发表于2019-11-22 15:21 被阅读0次

    一、前言

    为了解藕下发指令功能,加入了 RabbitMQ 中间件,目的很简单,就是使用独立出来的消息中间件,使得两端的应用互不影响,你重启你的,我发送我的。

    根据此应用场景,对组件功能的需求也比较简单。考虑到类似的简单场景也具备一定的通用性,于是进行简易封装。

    期望的外部方法主要有:

    • 指定队列名称发送消息
    • 消息到达可指定处理方法

    这里我们使用默认的 exchange,也即不涉及这个概念。

    引用程序集,如:rabbitmq-dotnet-client-3.6.6-dotnet-4.5.zip

    https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/

    二、先看使用方式

    为了简洁,这里在同一个控制台上完成同一队列的发送与接收,手动输入发送内容,接收显示发送的内容。

    (1)形成通话对象

    参数如其名,将相应的信息作为参数传递即可。

    RabbitMQQueueTalker rmt = new RabbitMQQueueTalker(
            host: "localhost",      
            username: "guest", 
            password: "guest", 
            sendQueueName: "QueueTalk", 
            receiveQueueName: "QueueTalk", 
            durable: false);
    

    (2)绑定消息响应方法

    // 消息接收响应
    rmt.OnMessage(s => {
        Console.WriteLine(String.Format("receive message : {0}", s));
    }); 
    

    (3)发送消息

    rmt.SendMessage(message);
    

    完整调用过程

    class Program
    {
        static void Main(string[] args)
        {
            RabbitMQQueueTalker rmt = new RabbitMQQueueTalker(
                host: "localhost", 
                username: "guest", 
                password: "guest", 
                sendQueueName: "QueueTalk", 
                receiveQueueName: "QueueTalk", 
                durable: false);
    
            // 消息接收响应
            rmt.OnMessage(s => {
                Console.WriteLine(String.Format("receive message : {0}", s));
            }); 
    
            // 输入并发送消息
            while (true)
            {
                // input message
                string message = Console.ReadLine();
                if (rmt.SendMessage(message))
                {
                    Console.WriteLine("send message :{0}", message);
                }
            }
        }
    }
    
    运行结果如下:

    三、封装实现

    3.1 启动连接

    应用需要连接能够断线重连,这里定义 GetConnection 方法,实现在未建立时新建连接,建立之后则由断线重连属性 AutomaticRecoveryEnabled 完成断线之后的恢复。

    private IConnection GetConnection()
    {
        if (_Connection != null) return _Connection;
    
        var factory = new ConnectionFactory()
        {
            HostName = this.Host,
            UserName = this.UserName,
            Password = this.Password,
            RequestedHeartbeat = 10,
            AutomaticRecoveryEnabled = true
        };
    
        try
        {
            factory.RequestedConnectionTimeout = 6000;
            _Connection = factory.CreateConnection();
    
            // 阻塞解除之后检测接收通道是否还打开
            _Connection.ConnectionUnblocked += (o, e) => {
                BuildReceiveChannel();
            };
    
            return _Connection;
        }
        catch (Exception se)
        {
            Debug.WriteLine(se.Message);
            Debug.WriteLine(se.StackTrace);
            return null;
        }
    }
    

    3.2 发送通道与发送方法

    默认队列,在通道建立的过程中进行队列声明 QueueDeclare

    // 获取发送通道
    private IModel GetSendChannel()
    {
        if (_SendChannel != null && !_SendChannel.IsClosed) return _SendChannel;
    
        var conn = GetConnection();
        if (conn == null) return null;
    
        try
        {
            _SendChannel = conn.CreateModel();
    
            _SendChannel.QueueDeclare(queue: SendQueueName,
                                durable: Durable,
                                exclusive: false,
                                autoDelete: false,
                                arguments: null);
    
            return _SendChannel;
        }
        catch (Exception se)
        {
            Debug.WriteLine(se.Message);
            Debug.WriteLine(se.StackTrace);
            return null;
        }
    }
    

    对于临时指定队列名称的,则在方法中动态进行队列声明 QueueDeclare。开放的方法如下。

    /// <summary>
    /// 向默认发送队列发送消息
    /// </summary>
    /// <param name="message"></param>
    /// <returns></returns>
    public bool SendMessage(string message)
    {
        return SendMessage(SendQueueName, message);
    }
    
    /// <summary>
    /// 向指定队列发送消息
    /// </summary>
    /// <param name="queueName"></param>
    /// <param name="message"></param>
    /// <returns></returns>
    public bool SendMessage(string queueName, string message)
    {
        var channel = GetSendChannel();
        if (channel == null) return false;
    
        try
        {
            if (SendQueueName != queueName)
            {
                channel.QueueDeclare(queue: queueName,
                                durable: Durable,   
                                exclusive: false,
                                autoDelete: false,
                                arguments: null);
            }
    
            var body = Encoding.UTF8.GetBytes(message);
    
            // 发送消息
            channel.BasicPublish(exchange: "",
                                    routingKey: queueName,
                                    basicProperties: null,
                                    body: body);
            return true;
        }
        catch (Exception se)
        {
            Debug.WriteLine(se.Message);
            Debug.WriteLine(se.StackTrace);
            return false;
        }
    }
    

    3.3 接收通道与接收方式

    定义响应方法列表,用于在消息事件中逐个调用。

    private List<Action<string>> ReceiveActionList = new List<Action<string>>();
    

    建立接收通道,并绑定接收事件。

    // 建立接收通道
    private IModel BuildReceiveChannel()
    {
        if (_ReceiveChannel != null && !_ReceiveChannel.IsClosed) return _ReceiveChannel;
    
        var conn = GetConnection();
        if (conn == null) return null;
    
        try
        {
            _ReceiveChannel = conn.CreateModel();
    
            _ReceiveChannel.QueueDeclare(queue: ReceiveQueueName,
                            durable: Durable,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);
    
            var consumer = new EventingBasicConsumer(_ReceiveChannel);
    
            // 绑定消息事件
            consumer.Received += (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                foreach (Action<string> action in ReceiveActionList)
                {
                    action(message);
                }                    
            };
            // 启动消费者
            _ReceiveChannel.BasicConsume(queue: ReceiveQueueName,
                                    noAck: true,
                                    consumer: consumer);
    
            return _ReceiveChannel;
        }
        catch (Exception se)
        {
            Debug.WriteLine(se.Message);
            Debug.WriteLine(se.StackTrace);
            return null;
        }
    }
    

    向外开放添加响应方法的功能。

    /// <summary>
    /// 添加消息到达响应方法
    /// </summary>
    /// <param name="action"></param>
    public void OnMessage(Action<string> action)
    {
        lock (ReceiveActionList)
        {
            ReceiveActionList.Add(action);
        }
    
        BuildReceiveChannel();
    }
    

    详细源码

    https://github.com/triplestudio/helloworld/blob/master/RabbitMQTest/RabbitMQQueueTalker.cs

    相关文章

      网友评论

        本文标题:C# 简单操作 RabbitMQ 发送与接收队列消息

        本文链接:https://www.haomeiwen.com/subject/phvfwctx.html