美文网首页Java 杂谈消息队列
RabbitMQ如何保证消息的可达性

RabbitMQ如何保证消息的可达性

作者: 清幽之地 | 来源:发表于2018-11-25 23:54 被阅读2次

    一、RabbitMQ简介

    AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

    RabbitMQ,又称为高性能分布式消息队列,它实现了AMQP标准协议。

    分布式消息队列有很多应用场景,比如异步处理、应用解耦、流量削峰等。

    1、异步处理

    用户注册后需要发送短信和邮件,传统做法是先将用户信息写入数据库,然后发送短信、发送邮件,都完成后返回。

    如果用到消息队列,可以先将用户信息写入数据库,然后将注册信息写入消息队列,发送短信、发送邮件或者还有其他的业务逻辑都订阅此消息,完成发送。

    2、应用解耦

    还是上面的例子,如果在一个大型分布式网站中,用户系统、短信系统、邮件系统可能都是独立的系统服务。

    这时候,在用户注册成功后,你可以通过RPC远程调用不同的服务接口,但更好的做法还是通过消息队列,订阅自己感兴趣的数据,日后就算增加或者删减功能,主业务都不用变动。

    3、流量削峰

    一般在秒杀或者团购活动中,流量激增,应用面临压力过大。可以在应用前端加入消息队列,通过设置队列最大长度来限制活动人数。这时候,后端服务器就可以游刃有余的处理数据了。

    二、消息通信

    在AMQP协议中,有几个基本概念,我们必须先搞明白。

    1、Virtual host

    虚拟主机,每一个虚拟主机中包含所有的AMQP基本组件,用户、队里、交换器等都是在虚拟主机里面创建。典型的用法是,如果公司的多个产品只想用一个服务器,就可以把他们划分到不同的虚拟主机中,里面的任何信息都是独立存在,互不干扰。

    2、Connection

    连接,应用程序和服务器之间的TCP连接。

    3、Channel

    通道,当你的应用程序和服务器连接之后,就会创建TCP连接。一旦打开了TCP连接,就可以创建一个Channel通道,所以说Channel通道是一个TCP连接的内部逻辑单元。
    这是因为,创建和销毁TCP连接是比较昂贵的开销,每一次访问都建立新的TCP连接的话,不仅是巨大浪费,而且还容易造成系统性能瓶颈。

    4、Queue

    队列,所有的消息最终都会被送到这里,等待着被感兴趣的人取走。

    5、Exchange

    交换器,消息到达服务的第一站就是交换器,然后根据分发规则,匹配路由键,将消息放到对应队列中。值得注意的是,交换器的类型不止一种。

    • Direct
      直连交换器,只有在消息中的路由键和绑定关系中的键一致时,交换器才把消息发到相应队列

    • Fanout
      广播交换器,只要消息被发送到广播交换器,它会将消息发到所有的队列

    • Topic
      主题交换器,根据路由键,通配规则(*和#),将消息发到相应队列

    6、Binding

    绑定,交换器和队列之间的绑定关系,绑定中就包含路由键,绑定信息被保存到交换器的查询表中,交换器根据它分发消息。

    了解到这些组件相关概念后,我们总结一下来看看,一条消息在RabbitMQ中是如何流转的。


    RabbitMQ消息流转

    三、持久化和发送方确认

    1、持久化

    事实上,上图所示只是一个最基本的消息流转过程,交换器和队列这些组件还有一个比较重要的属性:持久化。

    默认情况下,重启RabbitMQ服务器之后,我们创建的交换器和队列都会消失不见,当然了,如果里面还有未来得及消费的数据,也将难于幸免。
    持久化交换器和队列,为的是在AMQP服务器重启之后,重新创建它们并绑定关系,在RabbitMQ中,设置durable属性为true即可。

    不过,除了这些还不够。虽然保证了交换器和队列是安全的,但那些还未来得及消费的数据就变得岌岌可危。所以,我们还要设置消息的投递模式为持久的。

    这样,如果RabbitMQ服务器重启的话,我们的策略和相关数据才会确保无忧。所以,我们说能从AMQP服务器崩溃中恢复的消息,称之为持久化消息。那么,它必须保证以下三点:

    • 设置投递模式为持久的
    • 交换器为持久的
    • 队列为持久的

    2、发送方确认

    到目前为止,我们已经保证了消息的安全性。但是,还有另外一个问题。由于发布操作是不返回任何信息给生产者的,我们怎么知道服务器是否正确接收消息并持久化到硬盘上了呢?

    为此,我们可以将通道设置为事务模式。事务是AMQP标准中的一部分,但RabbitMQ有更好的做法,那就是发送方确认模式,publisher confirm。如果设置了confirm模式,发布的消息会被分配一个唯一的ID号,等消息被投递给匹配的队列后,通道会发送一个发送方确认模式给生产者(包含消息的唯一ID)。

    四、与Spring整合实例

    废话了这么多,只是为了下面的代码部分做下铺垫。毕竟,了解到上面内容之后,代码其实已经快要跃然纸上了。

    1、配置文件

    配置文件中我们首先要声明RabbitMQ服务器的信息,IP地址、端口号、用户名密码等,但尤为重要的是,设置发布确认模式。

    <bean id="rabbitConnectionFactory"
        class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="127.0.0.1"/>
        <property name="username" value="shiqizhen"/>
        <property name="password" value="shiqizhen"/>
        <property name="port" value="5672"></property>
        <property name="virtualHost" value="shiqizhen"></property>
        <property name="publisherConfirms" value="true"></property>
        <property name="publisherReturns" value="true"></property>
    </bean>
    

    接着,还要声明交换器和队列,记得它们是持久化的哦,durable为true。

    <rabbit:admin connection-factory="rabbitConnectionFactory"/>
        
    //队列的名字、持久化、不要自动删除、不是独享队列
    <rabbit:queue name="userInfoQueue" durable="true" auto-delete="false" exclusive="false"/>
    //交换器,类型为direct。并绑定交换器和队列的关系,路由键为10086
    <rabbit:direct-exchange name="user-exchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="userInfoQueue" key="10086"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    

    最后,配置消费者和消息模板

    //配置消费者 ref为bean的引用 queues指明了消费者与队列的关系
    //重要的是acknowledge 确认模式为手动确认
    <rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="manual">
            <rabbit:listener ref="consumerListener" queues="userInfoQueue" method="onMessage" />
    </rabbit:listener-container>
    
    //配置Spring RabbitMQ消息模板 
    <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
        <constructor-arg ref="rabbitConnectionFactory"></constructor-arg>
        <property name="confirmCallback" ref="publisherConfirm"></property>
        <property name="returnCallback" ref="returnMsgCallBack"></property>
        <property name="mandatory" value="true"></property>
    </bean>
    

    2、生产者

    上面我们声明了rabbitTemplate,直接用它的send方法发送消息即可。不过它有几个参数必须先要了解下。

    • exchange
      交换器名称,消息发到哪个交换器上
    • routingKey
      路由键,交换器怎样分发消息到对应队列
    • Message
      消息体对象,它包含消息的主体和消息属性。消息属性包含很多附属信息,比如消息内容类型、消息ID、用户ID等。
    • CorrelationData
      消息相关数据,实际它只有一个ID的属性。不过很重要,在发布方确认的回调方法里,会带有这个参数。我们可以根据它很直观的看到哪条消息发送成功或失败。
    @Controller
    public class IndexController {
        
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @RequestMapping("/send_msg")
        @ResponseBody
        public User send_msg() {    
            String exchange = "user-exchange";
            String routingKey = "10086";
            
            User user = new User();
            String id = IdUtil.getId();
            user.setUid(id);
            user.setUsername("小小沙弥");
            user.setPassword("1234");
            user.setCreatetime(DateUtil.getDateTime(new Date()));
            
            CorrelationData correlation = new CorrelationData(id);
            Message message = new Message(JSONObject.toJSONBytes(user, SerializerFeature.WriteNullStringAsEmpty), new MessageProperties());
            logger.info("已发送消息到RabbitMQ服务器:{}",JSONObject.toJSONString(user));
            rabbitTemplate.send(exchange, routingKey,message,correlation);  
            return user;
        }
    }
    

    3、消费者

    消费者就是上面我们配置的listener ref引用的Bean。还记得我们把确认模式设置了手动确认,所以在消费者端有个很重要的动作,就是确认消息。

    • channel.basicAck(deliveryTag, false)
      第一个参数是RabbitMQ内部产生的消息ID,第二个参数代表是否批量确认消息。通过这个指令我们告诉生产者端,消息已经被正确消费了,RabbitMQ就会将此消息在磁盘上删除。
    • channel.basicReject(deliveryTag, false)
      拒绝消息。如果消费到的消息不是我们想要的,或者处理的时候报错,我们可以将消息拒绝。但值得注意的是第二个参数。如果设置为false,说明拒绝消息并将消息从服务器上删除;如果设置为true,说明拒绝消息并将消息重新放回队列。如果你的消费者只有一个,最好不要把它设置为true,否则消息会一直重试,直到把消费者端服务器搞死。如果因为处理失败而拒绝的话,最好将消息删除,同时将消息记录到日志文件或者数据库中。
    @Service
    public class ConsumerListener implements ChannelAwareMessageListener{
    
        Logger logger = LoggerFactory.getLogger(this.getClass());
        
        public void onMessage(Message message, Channel channel) throws Exception {
            
            logger.info("消费者监听到RabbitMQ消息...");
            MessageProperties properties = message.getMessageProperties();
            String msg = new String(message.getBody(),"utf-8");
            logger.info("交换器:{},路由键:{}",properties.getReceivedExchange(),properties.getReceivedRoutingKey());
            logger.info("消息内容:{}",msg); 
            long deliveryTag = properties.getDeliveryTag();
            channel.basicAck(deliveryTag, false);//确认信息,false为不批量确认
            //channel.basicReject(deliveryTag, true);//true为重入队列 false为删除消息
        }
    }
    

    4、发送方确认

    我们发送消息给RabbitMQ,第一站就是交换器。RabbitMQ是否能正确接收消息,我们就靠它来反馈。这里的CorrelationData就是在生产者端设置的,我们可以将它当成消息ID,也可以直接把消息写入这里。

    @Component
    public class PublisherConfirm implements ConfirmCallback{
    
        Logger logger = LoggerFactory.getLogger(this.getClass());
        
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack) {
                logger.info("消息投递成功!");
            }else {
                logger.warn("消息投递失败,原因:{},消息ID:{}",cause,correlationData.getId());
            }
        }
    }
    

    如果我们把交换器的名字写错,那么在这里,你就会得到以下信息:

    22:57:51,635  WARN PublisherConfirm:19 - 消息投递失败,原因:
    channel error; protocol method: #method<channel.close>
    (reply-code=404, reply-text=NOT_FOUND - no exchange 'user-exchange_xxx' in vhost 'shiqizhen', class-id=60, method-id=40),
    
    消息ID:516387069669408768
      22:57:51,638 ERROR CachingConnectionFactory:1278 - Channel shutdown: 
    channel error; protocol method:
     #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'user-exchange_xxx' in vhost 'shiqizhen', class-id=60, method-id=40)
    

    5、返回回调

    除了设置RabbitMQ的发送方确认,在Spring中还有一个publisherReturns值的我们注意。虽然我们将消息发送到了交换器,但交换器是否能正确将消息分发到对应队列,还要打个问号。如果消息无法发送到指定的队列,那么publisherReturns就会发挥作用。记住,如果想应用这个特性,需要将mandatory设置为true。

    @Component
    public class ReturnMsgCallBack implements ReturnCallback{
    
        Logger logger = LoggerFactory.getLogger(this.getClass());
        
        public void returnedMessage(Message message, int replyCode, 
                        String replyText, String exchange, String routingKey) {
            logger.info("消息内容:{}",new String(message.getBody()));
            logger.info("回复文本:{},回复代码:{}",replyText,replyCode);
            logger.info("交换器名称:{},路由键:{}",exchange,routingKey); 
        }
    }
    

    如果我们不小心写错了路由键的名字,那就会调用到这里。

    23:24:27,813  INFO ReturnMsgCallBack:16 - 消息内容:{"createtime":"2018-11-25 23:24:24","password":"1234","role":null,"uid":"516393749815754752","username":"小小沙弥"}
    23:24:27,814  INFO ReturnMsgCallBack:17 - 回复文本:NO_ROUTE,回复代码:312
    23:24:27,814  INFO ReturnMsgCallBack:18 - 交换器名称:user-exchange,路由键:10086_xxx
    
    //这里是发送方确认打印的信息 说投递到交换器成功
    23:24:27,814  INFO PublisherConfirm:17 - 消息投递成功!
    

    有个问题,如同第一个例子,如果写错了路由键的名称,发送方确认会打印ack为false的异常信息,但为什么不会调用到publisherReturns呢?

    如果路由键错误,说明消息压根就没有被接收到。这肯定是一个严重错误,所以RabbitMQ直接把当前通道关闭了。

    Channel shutdown: 
    channel error; protocol method:
    reply-code=404, reply-text=NOT_FOUND - no exchange 'user-exchange_xxx' in vhost ...
    

    五、监听RabbitMQ服务器状态

    如果你的RabbitMQ服务不是一个集群,那么当网络故障或其他原因导致RabbitMQ服务停掉的时候,我们怎么做呢?当然,你可以在Send方法中加入try/catch,根据catch信息返回你的状态。但有个更好的思路,可以结合使用。
    在创建RabbitMQ服务连接的时候,我们要配置一个Bean,CachingConnectionFactory它有个方法addConnectionListener,我们可以利用它来监听服务器的连接状态。

    public class RabbitMQConnectionListener implements ConnectionListener{
        public void onCreate(Connection connection) {
            System.out.println("服务器已启动...");
        }
        public void onClose(Connection connection) {
            System.out.println("服务器已关闭...");
        }
    }
    

    并在合适的位置,比如Spring容器初始化方法里,加入这么一句rabbitConnectionFactory.addConnectionListener(new RabbitMQConnectionListener());

    这样,我们就可以掌握RabbitMQ服务器的连接状态了,那么我们就可以根据此状态,在生产者方调用send方法的时候,判断此状态。如果未连接,可以先将消息保存到数据库或者缓存中。当连接到RabbitMQ,我们先把缓存的消息拿出来发送,再将此状态重置为已连接。

    六、总结

    本文简单介绍了AMQP协议标准中的相关概念,以及RabbitMQ在Spring中如何正确配置使用持久化消息、发送方模式和返回回调等机制。并在最后,介绍了在Spring中如何监听RabbitMQ的服务器连接状态。总而言之一句话,我们将要怎样使用RabbitMQ,才能保证消息不会丢失。希望本文对你使用RabbitMQ有所帮助!

    相关文章

      网友评论

        本文标题:RabbitMQ如何保证消息的可达性

        本文链接:https://www.haomeiwen.com/subject/qowjqqtx.html