美文网首页
spring-rabbitmq测试

spring-rabbitmq测试

作者: Chandler_珏瑜 | 来源:发表于2020-11-20 17:26 被阅读0次

    一、基本介绍

    spring-rabbitmq-思维导图.png

    1.1 maven依赖

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
    </dependency>
    

    spring-rabbit依赖了amqp-client,同时对其进行了封装,使用CachingConnectionFactory去管理connection,使用spring-retry实现重试机制。

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
    </dependency>
    

    二、Spring Boot RabbitMQ启动流程

    Rabbitmq-starter初始启动流程图.png

    spring boot启动时,会执行RabbitAutoConfiguration的初始化,加载rabbitmq所需要的bean,其中CachingConnectionFactory 在初始化的时候,会创建一个connection,尝试与rabbitmq server建立连接,如果server处于异常状态,将出现如下所示的异常:


    spring boot rabbitmq connection error.png

    如上所示,应用会持续进行connect尝试,直到成功建立连接。如果成功如下所示:


    rabbitmq-client启动成功.png

    然后,就可以进行正常的消息的生产,消息的消费。

    三、生产者

    3.1 发送流程

    //1 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    
    //2 设置RabbitMQ相关信息
    factory.setHost("127.0.0.1");
    factory.setUsername("springcloud");
    factory.setPassword("springcloud");
    factory.setPort(5672);
    
    //3 创建一个新的连接
    Connection connection = factory.newConnection();
    
    //4 创建一个通道
    Channel channel = connection.createChannel();
    
    //5 声明一个队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //6 声明为确认模式
    channel.confirmSelect();
    
    //7 发送消息到队列中
    for (int i = 0; i < 10; i++) {
        String message = "Hello RabbitMQ, I'm chandler" + i + " !";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println("Producer Send +'" + message + "'");
    }
    
    //8 关闭通道和连接
    channel.close();
    connection.close();
    

    如上代码所示,是一个标准消息生产者的生命周期,根据整理获得如下生命周期流程图:


    Rabbitmq发送消息流程.png

    Spring-Rabbit将如上流程进行了重新构建,使用 CachingConnectionFactory 实现connection的管理和复用,使用 RetryTemplate 实现消息发送高可用[重试机制],使用 RabbitTemplate 管理消息的发送和接收,发送的核心函数如下所示


    RibbatTemplate#doSend.png

    对上图两个红框进行追踪,整理出如下Spring-Rabbit发送流程的时序图:


    Rabbitmq发送消息-时序图.png

    3.2 确认机制

    我们使用Confirm的时候,首先要设置Channel为Confirm模式,即向broker端发送Confirm.Select。confirmSelect()代码如下:
    Channel主要实现类是 ChannelN

    @Override
    public Confirm.SelectOk confirmSelect()
        throws IOException
    {
        if (nextPublishSeqNo == 0) nextPublishSeqNo = 1;
        return (Confirm.SelectOk)
            exnWrappingRpc(new Confirm.Select(false)).getMethod();
    }
    

    如上所示,当Channel开启Confirm模式的时候,nextPublishSeqNo=1,标记第一条publish的序号。如下是client进行push时候的源码:


    push.png

    如上所示,将publish序号存入一个线程安全的set中进行管理,查看 waitForConfirms(),当超时时间为0时,线程将会一直等待server响应,可以看到如下所示源码:


    waitForConfirms.png

    RabbitTemplate在设置确认机制时,将自身作为listener作为监听器。在server端接收到消息之后应答反馈“收到了“,不过只是确保了消息发送到queue,并不保证消息最终会被消费。


    RabbitTemplate#setupConfirm.png

    四、消费者

    4.1 消费流程

    //获取connection连接
    //创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    
    //设置RabbitMQ相关信息
    factory.setHost("127.0.0.1");
    factory.setUsername("springcloud");
    factory.setPassword("springcloud");
    factory.setPort(5672);
    //创建一个新的连接
    Connection connection = factory.newConnection();
    
    //创建通道,你必须要声明一个消息消息队列,然后向该队列里推送消息
    Channel channel = connection.createChannel();
    
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
    //在低版本中,new QueueingConsumer();的方式,但是这种方式已经被废弃了,不建议使用
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
            String msg = new String(body, "utf-8");
            System.out.println("msg:" + msg);
        }
    };
    
    boolean autoAck = true;
    channel.basicConsume(QUEUE_NAME, autoAck, defaultConsumer);
    

    如上代码所示,是一个标准消息消费者的生命周期,根据整理获得如下生命周期流程图:

    Spring-Rabbit将如上流程进行了重新构建,使用 SimpleMessageListenerContainer 管理 listener 实现消息的接收和处理,使用 RetryTemplate 实现消息消费的高可用[重试机制]。并实现完全的注解化,核心注解有@RabbitListener、@RabbitHandler、@EnableRabbit。如下是spring-Rabbit启动一个消费者监听器的简单示例:

    /**
     * 消费者
     */
    @Slf4j
    @Component
    @RabbitListener(queues = "topic.message1")
    public class Receiver {
    
        @RabbitHandler
        public void process(String hello) throws Exception{
            log.info("Receiver : {}", JSON.parseObject(hello, MessageTestContext.class).toString());
        }
    }
    

    4.2 消费者容器

    SimpleMessageListenerContainer 即简单消息监听器容器:

    • 这个类非常强大,我们可以对它进行很多的设置,用对于消费者的配置项,这个类都可以满足。它 有监听单个或多个队列、自动启动[使用注解]、自动声明功能[使用注解]。
    • 可以设置事务特性、事务管理器、事务属性、事务并发、是否开启事务、回滚消息等。
    • 支持动态设置,设置消费者数量、最小最大数量、批消费;消息确认模式、是否重回队列、异常捕获等等。
      通过对核心类 SimpleMessageListenerContainer 的初始化过程,已经start函数进行梳理,整理出如下时序图:


      Spring-Rabbitmq消息消费-时序图.png
    RabbitListenerAnnotationBeanPostProcessor
    RabbitListenerAnnotationBeanPostProcessor.png
    1. 根据注解发现监听器,收集信息
    2. 创建 rabbitAdmin 注册到spring容器中,并与 MethodRabbitListenerEndpoint 绑定
    3. 创建 RabbitListenerContainerFactory 注册到spring容器中
    SimpleMessageListenerContainer
    SimpleMessageListenerContainer.png
    1. RabbitListenerEndpointRegistry 创建消费者容器,并注册spring容器


      消费者容器.png
    2. MessageListenerContainer 的具体实现类是 SimpleMessageListenerContainer

    3. 生命周期被spring-context管理,也可以被AsyncRabbitTemplate 使用

    4. 获取有效的 RabbitAdmin (对rabbitmq-template封装)

    5. 通知唤醒所有消费者容器

    6. 初始化消费者,将封装好的消费者(AsyncMessageProcessingConsumer)放入线程池中

    AsyncMessageProcessingConsumer

    AsyncMessageProcessingConsumer 内部封装了 BlockingQueueConsumer,BlockingQueueConsumer 是具体的消费者。封装的原因是可以方便的管理和监控消费者,同时能记录消费者生命周期过程中的异常状态。

    BlockingQueueConsumer#run

    attemptPassiveDeclarations() 声明目标队列
    channel.basicQos() 如果没有设置自动确认,从队列中预取目标数量的消息
    consumeFromQueue(queueName) 从目标队列中获取消息实体

    4.3 Delivery

    ACK机制是RabbitMQ中处理数据安全性的另外一种设置。
    由于消息在传递过程中无法保证一定到达目的地,需要一种确认机制来对消息是否成功进行确认。
    RabbitMQ通过 basic.delivery来给消费者传递消息,每次发送,都会附带一个delivery tag ,就是说每个channel都会有一个特有的delivery tag。ACK 必须在相同的delivery tag的前提条件下进行确认。
    receiveAndExecute函数
    消费者的消费处理逻辑主要在 SimpleMessageListenerContainer 这个实现类中完成,如下所示,receiveAndExecute 是确认机制实现的入口函数。

    最后一个消息到达,开始进行确认逻辑

    private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Throwable {
        // 判断事务管理器是否存在
       if (this.transactionManager != null) {
          try {
             if (this.transactionTemplate == null) {
                 // 如果不存在事务,则新建一个事务管理器,并将当前连接资源和事务绑定
                this.transactionTemplate =
                      new TransactionTemplate(this.transactionManager, this.transactionAttribute);
             }
             return this.transactionTemplate
                   .execute(new TransactionCallback<Boolean>() {
    
                      @Override
                      public Boolean doInTransaction(TransactionStatus status) {
                         RabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(
                               new RabbitResourceHolder(consumer.getChannel(), false),
                               getConnectionFactory(), true);
                         try {
                             // 在事务管理器内执行
                            return doReceiveAndExecute(consumer);
                         }
                         catch (RuntimeException e) {
                            prepareHolderForRollback(resourceHolder, e);
                            throw e;
                         }
                         catch (Throwable e) { //NOSONAR
                            // ok to catch Throwable here because we re-throw it below
                            throw new WrappedTransactionException(e);
                         }
                      }
                   });
          }
          catch (WrappedTransactionException e) {
             throw e.getCause();
          }
       }
        // 若已存在事务,则直接执行
       return doReceiveAndExecute(consumer);
    
    }
    

    可以看到是通过事务机制来实现,当处理过程抛出异常的时候进行回滚,执行 BlockingQueueConsumer#rollbackOnExceptionIfNecessary。如下所示:


    doReceiveAndExecute
    rollbackOnExceptionIfNecessary函数

    如果设置AcknowledgeMode为NONE或者MANUAL,将进入basicReject 函数执行逻辑,
    basicReject 函数将向服务端抛deliveryTag ,声明消息未被消费。

    public enum AcknowledgeMode {
        NONE,
        MANUAL,
        AUTO;
        
        public boolean isTransactionAllowed() {
           return this == AUTO || this == MANUAL;
        }
        
        public boolean isAutoAck() {
           return this == NONE;
        }
        public boolean isManual() {
           return this == MANUAL;
        }
        
    }
    

    五、测试

    异常场景服务端模拟:

    • 服务端集群模式:
      • 单节点
      • 集群
    • 服务端节点状态:
    1. 节点宕机
    2. 节点启动
    3. 节点卡顿
    4. 节点恢复
      异常场景客户端模拟:
    • RabbitMQ工作模式
    1. Publish/Subscribe 发布订阅、exchange
    2. Routing 路由
    • 客户端动作
      • 一、topic创建
      • 二、消息发送
      • 三、消息消费
    • 确认模式,自动确认
    1. 生产者确认
    2. 消费者确认
    • 重试机制 有
    1. 重试策略
    2. 生产时异常
    3. 消费时异常
      • Redelivered:BlockingQueueConsumer#rollbackOnExceptionIfNecessary
    • 异常统计:
      • A.超时异常
      • B.心跳异常
      • C.socket中断异常
      • D.connection拒绝
      • E.connection阻塞
      • F.异常堆栈
        序号 场景 客户端动作 CPU memory single cluster 结果
        1 3、5、6、7、8 一、二、三 充沛 充沛 是 只出现异常A,其它异常都未出现
        2 3、6、7、8、 二、三 充沛 不足 是 只出现异常A,其它异常都未出现
        3 1、2、6、7、8 一、二、三 充沛 不足 是 出现D,并可能出现socket写入失败
        4 3、4、5、7、8 一、二、三 不足 充沛 是 异常A,E,各种类型的超时

    六、总结

     本次工作的核心内容包含,梳理spring-rabbitmq生产者生产流程和消费者消费过程,测试客户端与服务端交互,如下图所示:

     一种成熟技术一般都经历了好几年时间的洗礼,功能变得不断完善,内部原理与执行逻辑也变得更加复杂。我们作为使用者,很难对其做到完全掌握,每个人的掌握的程度也有浅有深。对spring-rabbit的梳理,可以来带以下几个好处:

    1. 梳理清楚生产者的生产流程和消费者的消费过程,对于理解一些问题的产生带来便利。当未来可能产生的问题,可以降低理解成本,有利于更快的定位问题源。
    2. 梳理整理出的文档可以落实到wiki,一则方便记录以便后续更新完善,二则有利于降低组内成员的理解差差距,方便大家在一个理解层面上沟通。
    3. 技术的利用逐步升华,组件的开发逐步优化,应该有一个更优的方法论持续推进。以下是我个人整理的技术利用的方法论,如果有补充的可以联系我

    相关文章

      网友评论

          本文标题:spring-rabbitmq测试

          本文链接:https://www.haomeiwen.com/subject/fltdmktx.html