美文网首页
Spring rabbit 生产者发送消息乱序

Spring rabbit 生产者发送消息乱序

作者: 瞿大官人 | 来源:发表于2020-05-14 21:07 被阅读0次

写在前面

当使用spring rabbit的时候我以为生产者发送消息是有序的,直到某天线上出现问题(支付消息在前,下单消息在后)才下定决心要看看spring rabbit的源码。

CachingConnectionFactoryd 的基本思路

image.png

有一个LinkedList当作缓存channel的队列,线程获取channel的时候都是从队头拿,而且每次只有一个线程进入临界区拿channel(channel不能被多个线程共享),只有上一个线程退出临界区,后一个线程才能进入。当线程使用完channel后会将channel放入队尾。

rabbitmq只保证在同一个channel中,生产者发送消息到队列是有序的。详情见rabbitmq-message-order-of-delivery
所以spring rabbitmq的这种设计就有可能使用不同的channel 发送下单、支付消息,接着在队列里面支付消息在前,下单消息在后。

源码

获取channel

CachingConnectionFactory.getChannel 这是获取channel的入口

                // 缓存channel队列
        LinkedList<ChannelProxy> channelList;
        if (this.cacheMode == CacheMode.CHANNEL) {
            channelList = transactional ? this.cachedChannelsTransactional
                    : this.cachedChannelsNonTransactional;
        }
        else {
            channelList = transactional ? this.allocatedConnectionTransactionalChannels.get(connection)
                    : this.allocatedConnectionNonTransactionalChannels.get(connection);
        }
        ChannelProxy channel = null;
        if (connection.isOpen()) {
                    //临界区
            synchronized (channelList) {
                while (!channelList.isEmpty()) {
                                  // 从对头获取channel
                    channel = channelList.removeFirst();
                    if (logger.isTraceEnabled()) {
                        logger.trace(channel + " retrieved from cache");
                    }
                    if (channel.isOpen()) {
                        break;
                    }
                    else {
                        
                            Channel target = channel.getTargetChannel();
                            if (target != null) {
                                target.close();
                            }
                        
                        
                        channel = null;
                    }
                }
            }
        }
      //队列中没有channel ,则新建channel
        if (channel == null) {
                        
                channel = getCachedChannelProxy(connection, channelList, transactional);
              }
        return channel;

channel放进队尾

CachingConnectionFactory.CachedChannelInvocationHandler.logicalClose

        .....
            // Allow for multiple close calls...
            if (!this.channelList.contains(proxy)) {
              // channel 放入队尾
                this.channelList.addLast(proxy);
                setHighWaterMark();
            }
...

最后

如果你的业务对生产者发送消息的顺序比较敏感,使用spring rabbit的时候就要小心了。

相关文章

  • Spring rabbit 生产者发送消息乱序

    写在前面 当使用spring rabbit的时候我以为生产者发送消息是有序的,直到某天线上出现问题(支付消息在前,...

  • spring rabbit配置生产者发送

    前言:由于被测系统中的接口是离线处理,用到的中间件为rabbitmq,故这里一下我用到的测试类编写方式。 首先是x...

  • rabbit入门(三)工作队列轮询分发和公平分发

    rabbit是一个消息代理和队列服务器。在实际开发当中,生产者发送消息是简单的,只需将消息放入消息中间件的时候就好...

  • springboot-rabbitmq之hello-world(

    概念介绍 这里引用rabbit官网的一张图 大概意思就是生产者把消息发送到队列然后消费者消费消息 springbo...

  • Kafka 生产者概述

    生产者:往消息队列里推送消息的应用 发送消息的过程 Kafka 生产者发送消息的过程: Kafka 会将发送消息包...

  • RabbitMQ 参数属性

    RabbitMQ rabbit message queue的基本概念 producer:消息生产者,就是投递消息的...

  • RocketMQ生产者

    书接上文,今天我们来说说RocketMQ生产者发送消息 生产者发送消息 生产者发送消息的主要流程图如上图所示。具体...

  • 阿里云rocketMQ接入

    核心概念 Topic:消息主题,一级消息类型,生产者向其发送消息。 生产者:也称为消息发布者,负责生产并发送消息至...

  • Kafka生产者

    目录 生产者是如何向kafka发送消息的 生产者发送消息有几种方式 生产者怎么通过配置进行调优 生产者是如何序列化...

  • 5.发送和消费信息

    文章参考:Rabbit实战指南 发送消息 ​ 如果要发送一个消息,可以使用Channel类的basicPubl...

网友评论

      本文标题:Spring rabbit 生产者发送消息乱序

      本文链接:https://www.haomeiwen.com/subject/prixohtx.html