美文网首页
RabbitMQ的基本概念

RabbitMQ的基本概念

作者: pingwazi | 来源:发表于2020-09-06 20:53 被阅读0次

    有问题请联系我QQ:273206491

    RabbitMQ是基于Erlang语言(俗称:二郎神)对AMQP协议的实现。

    1、各个模块之间的一览图

    image.png

    2、连接

    这里以Java的客户端进行说明,客户端与RabbitMQ服务器之间是基于TCP连接的,而TCP连接的创建和销毁都非常耗费资源,因此RabbitMQ使用连接复用模式,也就是我们常用的Channel,一个TCP连接可以创建多个Channel,不同Channel之间是互相独立的,一个线程使用一个Channel是安全的,不会出现多线程共享同一个连接的问题(线程共享一个资源很容易出现线程安全的问题)。

    2.1、连接创建示例

    //连接工厂的初始化
    ConnectionFactory connectionFactory=new ConnectionFactory();
    connectionFactory.setHost("rabbitmq服务器的地址");
    connectionFactory.setUsername("用户名");
    connectionFactory.setPassword("密码");
    //通过连接工厂与rabbitmq之间建立一个tcp连接
    Connection connection =connectionFactory.newConnection();
    //通过连接创建信道,多个线程之间不要共享同一个信道
    Channel channel = connection.createChannel();
    

    2.2、资源释放问题

    在连接不使用的时候及时关闭连接是非常重要的步骤,可能你在本地开发时不释放连接不会有什么问题,但一旦程序上线后,连接不释放很有很多导致RabbitMQ的连接因耗尽而无法接受新的连接请求或者其他什么问题。
    释放连接的方式也非常简单,直接调用Connection对象的close方法可以释放一个连接,同时也会释放这个连接下的所有Channel资源。

    3、生产者

    用于生产消息,使用basicPublish向RabbitMQ发送消息

    upChannel.basicPublish("msgExchange", "message", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8"));
    
    • exchange 交换器的名称
    • routingKey 路由key(当交换器的类型是fanout时,路由key是不生效的)
    • props 用来设置消息的相关属性
    ///下面这两种方式是等价的
    MessageProperties.PERSISTENT_TEXT_PLAIN
    或者
    AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder();
    properties.deliveryMode(2);
    properties.contentType("text/plain");
    properties.headers(null);//用于设置自己的header属性,交换器的类型中就有一种为headers(但不推荐使用),当我们设置的headers属性值和交换器绑定的值一致是就能够路由到响应队列中
    
    • body 消息体
    • mandatory

    4、交换器

    交换器可以认为是一个消息中转站,他通过和队列进行绑定,我们把消息发送到交换器中,交换器根据路由key再决定将消息投递给哪个队列(交换器的类型为fanout时,路由key是无效的)。

    4.1、类型

    • fanout 会把消息给绑定到这个交换器的所有队列都发生一遍
    • direct 只将消息发生给路由匹配的队列
    • topic 将消息发生获取路由匹配的对象,但是这里的匹配支持模糊匹配,rabbitmq的路由key使用点"."来分隔一个单词,"*"匹配一个单词,"#"匹配一个或者多个单词。举个例子:路由key:com.pingwazi.rabbitmq,绑定key1:com.#,绑定key2:com.*.*是匹配的。
    • headers 发生消息的headers属性完全匹配是则认为匹配,这个模式不常用,并且性能也很差。

    4.2、创建

    //声明一个交换器,如果存在就不创建,如果存在的交换器参数与声明交换器参数不匹配就会报错,如果不存在就会创建
    //这方式什么是同步的
    upChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
    //这种方式声明式异步的,但是不推荐使用,因为可能你在调用了之后就去使用的时候RabbitMQ服务器还没有创建好。
    upChannel.exchangeDeclareNoWait();
    
    • exchange 交换器的名字
    • type 交换器的类型
    • durable 指定这个交换器是持久化的
    • autoDelete 是否自动删除,true则表示自动删除,但是有一个前提条件,那就是曾经至少要有一个队列或者交换器与之绑定,但是现在都已经接触绑定了。这时RabbitMQ服务器才会自动的吧这个交换器给删除掉,也就是说如果这个交换器创建出来后,没有任何队列或者交换器与之绑定的话,RabbitMQ是不会自动删除的,即使我们设置了自动删除。
    • internal 设置是否是内置的交换器,如果设置为true,则表示是内置的,那么客户端程序将无法直接向这个交换器发送消息,只能通过交换器路由到交换器的方式。
    • argument 其他可选参数

    4.3、判断交换器是否存在

    //判断指定交换器是否存在,如果不存在就会报404异常。
    upChannel.exchangeDeclarePassive();
    

    4.4、删除交换器

    // 同步删除
    upChannel.exchangeDelete("",true);
    //异步删除(不需要等待删除完成)
    upChannel.exchangeDeleteNoWait("",true);
    
    • exchange 交换器的名称
    • ifUnused 是否只删除没有被使用的交换器

    5、绑定路由key

    //将交换器与队列进行绑定通过message进行绑定
    upChannel.queueBind("msgQueue","msgExchange","message");
    //将交换器与队列进行解绑
    upChannel.queueUnbind("","","")
    //将交换器与交换器绑定在一起
    upChannel.exchangeBind( "","","")
    //将交换器与交换器解绑
    upChannel.exchangeUnbind( "","","")
    

    绑定路由key是在绑定交换器和队列时指定的一个key,其中message就是masQueue队列与msgExchange交换器的绑定路由key。

    路由key是发送消息的时候指定的key,交换器类型为direct或者topic时,路由key与绑定路由key后才会将消息发送到对应的队列中。

    6、队列

    队列是RabbitMQ实际存储消息的地方

    //声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
    upChannel.queueDeclare("msgQueue", true, false, false, null);
    //异步声明一个队列,不推荐使用,原因同交换器一样
    upChannel.queueDeclareNoWait();
    
    • queue 队列的名称
    • durable 设置队列是否为持久化队列,如果为true这表示是持久化队列
    • exclusive 是否为排它队列,值为true则表示为排他队列,如果一个队列是排它队列,那么除了创建他的连接(注意:这里说的是连接,不是channel)能够使用之外,其他连接都不能使用,并且在创建它的连接断开时,这个队列会自动删除,即使设置为持久化队列也是如此。
    • autoDelete 是否自动删除,值为true则表示自动删除,自动删除有一个前提条件,那就是曾经至少有一个消费者使用了这个队列,并且现在已经没有任何消费与这个队列建立了连接,这时候RabbitMQ才会自动删除这个队列。也就是说如果这个队列还没有任何消费与建立过连接,那么RabbitMQ是不会自动删除的。
    • arguments 其他的一些参数设置

    6.1、判断队列是否存在

    //判断队列是否存在
    upChannel.queueDeclarePassive();
    

    6.2、删除队列

    //同步删除
    upChannel.queueDelete("",true,true);
    //异步删除
    upChannel.queueDeleteNoWait("",true,true);
    
    • queue 队列名称
    • ifUnused 是否只删除没有被使用过的队列
    • ifEmpty 是否只删除空的队列

    7、消费者

    消息的消费者有两种常用的模式,即推模式和拉模式。两种模式的实现方式完全不同,推模式是只要队列中有消息了就会推送给消费者(当然了也要受未确认消息数的限制),而拉模式则是消费者需要的时候再去RabbitMQ中获取,而他一次也只能获取获取一条消息。

    7.1、拉模式

    image.png

    从图中可以看出,在单线程的情况下,消息的处理速度是比较慢的,当然了这里也可以使用多线程不断的从Rabbitmq中去获取,但这样就需要手动实现获取算法了。不废话,先上代码!

    private void receiveGetMessage()
        {
            try
            {
                Channel downChannel=connection.createChannel();
                //交换器类型:fanout、direct、topic
                //声明一个名字为msgExchange、类型为direct并且持久化的交换器,如果交换器已经存在就不再创建
                downChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
                //声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
                downChannel.queueDeclare("msgQueue", true, false, false, null);
                //将交换器与队列进行绑定通过message进行绑定
                downChannel.queueBind("msgQueue","msgExchange","message");
                //消息未确认消息的数量
                downChannel.basicQos(1);//在非自动确认的模式下,限制最多允许未确认的消息数量
                boolean isBreak=false;
                while (!isBreak)
                {
                    //消费消息
                    GetResponse msgData = downChannel.basicGet("", false);
                    String msgBody=new String(msgData.getBody(), "utf-8");
                    System.out.println(Thread.currentThread().getId()+"RabbitMQ拉模式消费者收到消息: " + msgBody);
                    //回复确认消息
                    downChannel.basicAck(msgData.getEnvelope().getDeliveryTag(),false);
                    if(StringUtils.isEmpty(msgBody))
                        isBreak=true;
                }
                downChannel.close();
            }
            catch (ShutdownSignalException ex)
            {
                //连接异常关闭了,这里要进行检查,并尝试重新建立连接
                ex.printStackTrace();
            }
            catch (IOException ex)
            {
                //发生io异常需要进行处理,对应channel可能关闭了
                ex.printStackTrace();
            } catch (TimeoutException e) {
                //信道资源释放超时,可能对应的channel关闭了
                e.printStackTrace();
            }
        }
    

    以上代码我是通过但线程循环的方式从RabbitMQ中拉取代码,这种模式处理速度较慢,在不是用多线程进行处理的情况下,这中模式适合用于处理单个消息比较耗时的场景。

    7.2、推模式

    image.png

    通过运行如下代码可以看得出,推模式实际上是使用了多线程的在进行处理的。但是他的吞吐量是默认拉模式的好几倍,这中模式适合于处理每个消息的时间比较短的场景。

    private void receivePushMessage()
        {
            try
            {
                Channel downChannel=connection.createChannel();
                //交换器类型:fanout、direct、topic
                //声明一个名字为msgExchange、类型为direct并且持久化的交换器,如果交换器已经存在就不再创建
                downChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
                //声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
                downChannel.queueDeclare("msgQueue", true, false, false, null);
                //将交换器与队列进行绑定通过message进行绑定
                downChannel.queueBind("msgQueue","msgExchange","message");
                //消息未确认消息的数量
                downChannel.basicQos(10000);//在非自动确认的模式下,限制最多允许未确认的消息数量
                //消费消息
                downChannel.basicConsume("msgQueue",createConsumer(downChannel));
                System.out.println("RabbitMQ消费者正在运行中...");
                //不能释放信道资源!!!
                //因为这里的消费者是用的推模式,如果关闭了信道,后面在进行消息消费的时候会报错
                //downChannel.close();
            }
            catch (ShutdownSignalException ex)
            {
                //连接异常关闭了,这里要进行检查,并尝试重新建立连接
                ex.printStackTrace();
            }
            catch (IOException ex)
            {
                //发生io异常需要进行处理,对应channel可能关闭了
                ex.printStackTrace();
            }
        }
    
        /**
         * 创建消费对象
         * @param channel
         * @return
         */
        private Consumer createConsumer(Channel channel)
        {
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                    String message = new String(body);
                    System.out.println(Thread.currentThread().getId()+"RabbitMQ推模式消费者收到消息: " + message);
                    // 消息确认
                    try {
                        channel.basicAck(envelope.getDeliveryTag(), false);//手动确认消息
                    } catch (IOException e) {
                        //发生io异常需要进行处理,对应channel可能关闭了
                        e.printStackTrace();
                    }
                }
            };
            return consumer;
        }
    

    相关文章

      网友评论

          本文标题:RabbitMQ的基本概念

          本文链接:https://www.haomeiwen.com/subject/dubcektx.html