美文网首页
RabbitMQ~消费者实时与消息服务器保持通话

RabbitMQ~消费者实时与消息服务器保持通话

作者: yichen_china | 来源:发表于2021-09-10 08:50 被阅读0次

    这个文章主要介绍简单的消费者的实现,rabbitMQ实现的消费者可以对消息服务器进行实时监听,当有消息(生产者把消息推到服务器上之后),消费者可以自动去消费它,这通常是开启一个进程去维护这个对话,它与消息服务器保持一个TCP的长连接,整个这个过程于rabbitMQ为我们提供,程序开发人员只需要实现自己的回调方法即可.

    简单的rabbitMQ消费者

        /// <summary>
        /// 消息消费者
        /// </summary>
        public class RabbitMqSubscriber : Lind.DDD.Commons.DisposableBase
        {
            private readonly string exchangeName;
            private readonly string queueName;
            private readonly IConnection connection;
            private readonly IModel channel;
            private bool disposed;
    
            /// <summary>
            /// 从消息服务器拉到消息后触发
            /// </summary>
            public event EventHandler<MessageReceivedEventArgs> MessageReceived;
    
            /// <summary>
            /// Initializes a new instance of <c>RabbitMqMessageSubscriber</c> class.
            /// </summary>
            /// <param name="uri"></param>
            /// <param name="exchangeName"></param>
            /// <param name="queueName"></param>
            public RabbitMqSubscriber(string uri, string queueName, string userName = "", string password = "")
            {
                this.exchangeName = exchangeName;
                this.queueName = queueName;
                var factory = new ConnectionFactory() { Uri = uri };
                if (!string.IsNullOrWhiteSpace(userName))
                    factory.UserName = userName;
                if (!string.IsNullOrWhiteSpace(password))
                    factory.Password = password;
                this.connection = factory.CreateConnection();
                this.channel = connection.CreateModel();
            }
    
            public void Subscribe()
            {
                channel.QueueDeclare(
                    queue: this.queueName, 
                    durable: false,//持久化
                    exclusive: false, //独占,只能被一个consumer使用
                    autoDelete: false,//自己删除,在最后一个consumer完成后删除它
                    arguments: null);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (sender, e) =>
                {
                    var body = e.Body;
                    var json = Encoding.UTF8.GetString(body);
                    var message = JsonConvert.DeserializeObject(json, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
                    this.OnMessageReceived(new MessageReceivedEventArgs(message));
                    channel.BasicAck(e.DeliveryTag, multiple: false);
                };
                channel.BasicConsume(queue: queueName,
                                     noAck: false,
                                     consumer: consumer);
            }
    
            private void OnMessageReceived(MessageReceivedEventArgs e)
            {
                this.MessageReceived?.Invoke(this, e);
            }
    
            protected override void Finalize(bool disposing)
            {
                if (disposing)
                {
                    if (!disposed)
                    {
                        this.channel.Dispose();
                        this.connection.Dispose();
                        disposed = true;
                    }
                }
            }
        }
    

    简单调用

       class Program
        {
            static void Main(string[] args)
            {
                var subscriber = new Lind.DDD.RabbitMq.RabbitMqSubscriber("amqp://localhost:5672", "zzl");
                subscriber.MessageReceived += Subscriber_MessageReceived;
                subscriber.Subscribe();
                Console.ReadKey();
            }
    
            private static void Subscriber_MessageReceived(object sender, RabbitMq.MessageReceivedEventArgs e)
            {
                Console.WriteLine("消费者2->消费了一个消息{0}", e.Message);
                Lind.DDD.Logger.LoggerFactory.Instance.Logger_Debug("消费者2->消费了一个消息{0}" + e.Message);
                Thread.Sleep(2000);
            }
    
        }
    

    实时拉消息

    RabbitMQ消息模型

    通过上面图我们可以更容易和清晰的去理解rabbitmq的工作流程.

    相关文章

      网友评论

          本文标题:RabbitMQ~消费者实时与消息服务器保持通话

          本文链接:https://www.haomeiwen.com/subject/udxwwltx.html