美文网首页
RabbitMQ 使用教程

RabbitMQ 使用教程

作者: 木叶苍蓝 | 来源:发表于2023-02-15 16:10 被阅读0次

    Rabbit 简介

    RabbitMQ 是一个有 Erlang 语言开发的 AMQP 的开源实现。
    AMQP:Advanced Message Queue 高级消息队列协议。它是应用层协议的一个开发标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品,开发语言等条件限制。

    队列(Queue)

    队列是常用的数据结构之一,是一种特殊的线性表,特殊之外在与它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作。进行插入操作的端称为队尾,进行删除操作的端称为队头。

    消息队列(Message Queue)

    消息:计算机 / 应用 间传送的数据单位,可以非常简单,例如只包含文件字符串,也可以很复杂,可以包含嵌入对象。
    消息队列:在消息的传输过程中保存消息的容器。
    消息传输时,先发送到队列,队列的主要目的是提供路由并保证消息的传递,如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功的传递它。
    可以把消息队列理解成快递公司,你需要寄一个物件(消息)给你的朋友,快递公司收到物件会保证物件送的你的朋友手中,可能存在多次寄送才能送达成功的情况,比如第一次送过去,但你的朋友不在家。
    消息队里中间件是分布式系统中重要的组件
    解决 应用耦合异步消息流量削峰 等问题
    实现 高性能高可用可伸缩最终一致性

    RabbitMQ 概念

    6c7db1f271da4f01b2f9033b05afeb3d.jpg
    1. Broke:又称 server,它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输。
    2. Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
    3. Queue:消息的载体,每个消息都会被投到一个或多个队列。
    4. Binding:绑定,它的作用就是把 exchange 和 queue 按照路由规则绑定起来。
    5. Routing Key:路由关键字,exchange 根据这个关关键字进行消息投递。
    6. vhost:虚拟机,一个 broker 里面可以有多个 vhost ,用作不同用户的权限分离。
    7. Producer:消息生产者,就是投递消息的程序。
    8. Consumer:消息消费者,就是接受消息的程序。
    9. Channel:消息通道,在客户端的每个连接里,可以建立多个 channel 。几乎所有的操作都在 channel 中完成。

    支持的消息类型

    adb765c1fdae43d7a4db9fa3c3bc2f3d.png
    1、简单模式 Simple
    bc3ab931857f465a9efff3b87a255254.png

    P (produce / publisher):生产者,一个发送消息的用户应用程序。
    C (consumer):消费者,消费和接收的意思,消费者是一个主要用来等待接收消息的用户应用程序。
    队列(红色区域):rabbitmq 内部类似于邮箱的一个概念。虽然消息流经 rabbitmq 和你的应用程序,但是它们只能存储在队里中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息多一个队列,许多消费者可以尝试从一个队列接收数据。

    总之:生产者将消息发送到队里,消费者从队列中获取消息,队列是存储消息的缓冲区。

    1、工作模式 Work

    工作模式又称竞争消费者模式
    主要思想就是避免执行资源密集型认识时,必须等待它执行完成。相反我们需要稍后完成任务,我们将任务封装为消息并将其发送到队列。在后台运行的工作进程获取任务并最终执行作业。当你运行许多消费者时,任务将在他们之间共享,但是一个消息只能被一个消费者获取。

    5b910530353b440a8e9dd1b89b05803c.png

    P:生产者,任务的发布者
    C1:消费者1
    C2:消费者2

    如何避免消息堆积?

    1. 采用 workqueue,多个消费者监听同一队列。
    2. 接收到消息后,通过线程池,异步消费。
    3、发布订阅模式(广播机制)(扇型交换机)

    1个生产者,多个消费者,每一个消费者都有自己的队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列都要绑定到交换机,生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的。(如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

    b2cb58d7654d474fadf58d916d2ebcc6.png
    交换机(Exchange)【蓝色圆圈x】

    一方面接收生产者发送的消息。另一方面知道如何处理消息,如递交个某个特别队列,递交给所有队列,还是将消息丢弃。到底如何操作,取决于交换机的类型。Exchange 只负责转发消息,不具备存储消息的能力。

    Exchange 类型有以下三种:

    1、Direct Exchange(直连型交换机):
    根据消息携带的路由键将消息投递给对应队列。
    大致流程为:有一个队列绑定到一个直连交换机上,同时赋予一个路由键 key。然后当一个消息携带着路由键 key,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值 key 去寻找绑定值也是 key 的队列。
    2、Fanout Exchange (扇型交换机)
    这个交换机没有路由键概念,就算你绑定了路由键也是无效的。这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
    3、Topic Exchange (主题交换机)
    这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间有规则

    (星号 *)用来表示一个单词(必须出现一个单词)
    (井号 #)用来表示任意数量的单词

    举例:
    队列Q1 绑定键为 .TT.
    队列Q2 绑定键为 TT.#
    如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到
    如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到

    当一个队列的绑定键为 "#" 的时候,这个队列将会无视消息的路由键,接收所有的消息。
    当"*" 和 "#" 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有直连交换机的行为。
    所以主题交换机也就实现了扇型交换机的功能,和直连交换机的功能。
    除了以上常用的交换机,还有 Header Exchange(头交换机),Default Exchange(默认交换机)和 (Dead Letter Exchange)(死信交换机)

    4、路由模式(直连交换机)

    在广播模式中,生产者发布消息,所有消费者都可以获取所有消息。
    在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到 Direct 类型的 Exchange 发送消息时,也必须指定消息的 routing key。


    0576d00349b04de9b67bdb42a3912221.png

    P:生产者,向 Exchange 发送消息,发送消息时,会指定一个 routing key。
    X:Exchange (交换机),接收生产者的消息,然后把消息递交给 routing key 完全匹配的队列
    C1:消费者,其所在队列指定了需要routing key 为 error 的消息。
    C2:消费者,其所在队列指定了需要 routing key 为 info,error,warning 的消息。

    在这种情况下,一个消息在发布时指定了路由键为 error 将只会被 c1 消耗。路由键为 info,error, warning 的消息都将被 c2 消耗,其他消息都将被丢失。

    5、主题模式(Topic )(主题交换机)

    Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符。


    7bb76ef2d5fc485a85a4a6049b74ae3d.png

    消息持久化

    目的是为了避免消息丢失
    消息丢失:

    • 消费者的 ACK 机制。可以防止消费者丢失消息。

    消费者领取消息后,还没执行操作就挂掉了?或者抛出了异常?消息消费失败,但是 RabbitMQ 无从得知,这样消息就丢失了!
    因此,RabbitMQ 有一个 ACK 机制。当消费者获取消息后,会向 RabbitMQ 发送回执 ACK,告知消息已经被接收。不过这种回执 ACK 分两种情况:

    1. 自动 ACK : 消息一旦被接收,消费者自动发送 ACK
    2. 手动 ACK:消息接收后,不会发送 ACK ,需要手动调用。
    • 如果在消费者消费之前,MQ 就宕机了,消息就没了。

    消息持久化的前提是:队列,Exchange 都持久化。

    • Exchange 持久化:
    // 获取通道
    Channel channel = connection.createChannel();
    // 声明 exchange ,指定类型为 topic
    channel.exchangeDeclare(EXCHANGE_NAME, "topic", true)
    // 消息内容
    
    • 队列持久化
    // 获取通道
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, true, false, false, false)
    
    • 消息持久化
    // 发送消息,并且指定 routing key 为 insert 代表新增
    channel.basicPublish(EXCHANGE_NAME, routing_key="item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    System.out.println(" [商品服务:] Sent ‘ " + message + " ’ ");
    

    代码示例

    1、引入依赖
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.5.13</version>
    </dependency>
    
    2、修改配置文件
    spring:
      rabbitmq:
        host: 127.0.0.1 #ip
        port: 5672      #端口
        username: guest #账号
        password: guest #密码
        virtualHost:    #链接的虚拟主机
        addresses: 127.0.0.1:5672     #多个以逗号分隔,与host功能一样。
        requestedHeartbeat: 60 #指定心跳超时,单位秒,0为不指定;默认60s
        publisherConfirms: true  #发布确认机制是否启用
        publisherReturns: #发布返回是否启用
        connectionTimeout: #链接超时。单位ms。0表示无穷大不超时
        ### ssl相关
        ssl:
          enabled: #是否支持ssl
          keyStore: #指定持有SSL certificate的key store的路径
          keyStoreType: #key store类型 默认PKCS12
          keyStorePassword: #指定访问key store的密码
          trustStore: #指定持有SSL certificates的Trust store
          trustStoreType: #默认JKS
          trustStorePassword: #访问密码
          algorithm: #ssl使用的算法,例如,TLSv1.1
          verifyHostname: #是否开启hostname验证
        ### cache相关
        cache:
          channel: 
            size: #缓存中保持的channel数量
            checkoutTimeout: #当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
          connection:
            mode: #连接工厂缓存模式:CHANNEL 和 CONNECTION
            size: #缓存的连接数,只有是CONNECTION模式时生效
        ### listener
        listener:
           type: #两种类型,SIMPLE,DIRECT
           ## simple类型
           simple:
             concurrency: #最小消费者数量
             maxConcurrency: #最大的消费者数量
             transactionSize: #指定一个事务处理的消息数量,最好是小于等于prefetch的数量
             missingQueuesFatal: #是否停止容器当容器中的队列不可用
             ## 与direct相同配置部分
             autoStartup: #是否自动启动容器
             acknowledgeMode: #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
             prefetch: #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量
             defaultRequeueRejected: #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
             idleEventInterval: #container events发布频率,单位ms
             ##重试机制
             retry: 
               stateless: #有无状态
               enabled:  #是否开启
               maxAttempts: #最大重试次数,默认3
               initialInterval: #重试间隔
               multiplier: #对于上一次重试的乘数
               maxInterval: #最大重试时间间隔
           direct:
             consumersPerQueue: #每个队列消费者数量
             missingQueuesFatal:
             #...其余配置看上方公共配置
         ## template相关
         template:
           mandatory: #是否启用强制信息;默认false
           receiveTimeout: #`receive()`接收方法超时时间
           replyTimeout: #`sendAndReceive()`超时时间
           exchange: #默认的交换机
           routingKey: #默认的路由
           defaultReceiveQueue: #默认的接收队列
           ## retry重试相关
           retry: 
             enabled: #是否开启
             maxAttempts: #最大重试次数
             initialInterval: #重试间隔
             multiplier: #失败间隔乘数
             maxInterval: #最大间隔
    

    简单模式

    1、配置类
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 简单模式配置
     */
    @Configuration
    public class RabbitMqConfig {
        
        public static final String QUEUE_NAME = "my_queue"; //队列名称
    
        /**
         * Queue(队列)
         */
        @Bean
        public Queue simpleQueue() {
           /**
             * name:队列名称。
             * durable:是否持久化,默认是false,持久化队列(内部会有一个actualName: 队列的真实名称,默认用name参数,如果name为空,则根据规则生成一个)
             * exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
             * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
             * arguments:设置队列的属性参数
             *  1、x-message-ttl:消息的过期时间,单位:毫秒;
             *  2、x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
             *  3、x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
             *  4、x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
             *  5、x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
             *  6、x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
             *  7、x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
             *  8、x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
             *  9、x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
             * 10、x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
             * 11、x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。
             */
            return new Queue(QUEUE_NAME, true, false, false, null);
        }
    }
    
    2、消费者
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import java.util.Map;
    
    /**
     * 简单模式
     * 消息消费者
     */
    @Component
    @RabbitListener(queues = "my_queue")//监听的队列名称
    //@RabbitListener(queuesToDeclare = @Queue("simple_queue"))  //如果simple_queue队列不存在,则创建simple_queue队列。默认队列是持久化,非独占式的
    public class SimpleConsumer {
    
        //消费者如果监听到消息队列有消息传入,则会自动消费
        @RabbitHandler
        public void receive(Map message) {
            System.out.println("简单模式 -> 消费者收到map类型消息  : " + testMessage.toString());
        }
        
        @RabbitHandler
        public void receive2(String message) {
            System.out.println("简单模式 -> 消费者收到string类型消息  : " + testMessage.toString());
        }
    }
    

    @RabbitListener 注解属性的作用:
    queuesToDeclare:如果 simple_queue 队列不存在,则会自动创建 simple_queue 队列。默认队列是持久化,非独占式的。
    queues:里面的队列必须存在,否则就会报错。
    @RabbitListener(queues = {"simple_queue2"}),如果队列 simple_queue2 不存在,那么启动消费者就会报错
    注意:
    @RabbitListener 既可以标记在类上,也可以标记在方法上
    标记在类上:需配合 @RabbitHandler 注解一起使用。当有收到消息的时候,就交给@RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型(入参类型进行决定)
    标记在方法上:就有指定的方法进行处理

    3、生产者
    /**
     * 简单模式
     * 消息生产者
     */
    @Controller
    @RequestMapping(value = "simple")
    public class SimpleProducer {
    
        @Autowired
        RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法
    
        /**
         * 向rabbitMq发送消息
         */
        @RequestMapping(value = "sendMsg")
        @ResponseBody
        public String send(String messageId,String messageData) {
            SimpleDateFormat simpleDateFormat  = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String createTime = simpleDateFormat.format(new Date());
            Map<String,Object> map=new HashMap<>();
            map.put("messageId",messageId);
            map.put("messageData",messageData);
            map.put("createTime",createTime);
            //将消息发送到队列my_queue中
            rabbitTemplate.convertAndSend("my_queue", map);
            //receive将接收到消息
            System.out.println("rabbitMQ 简单模式消息发送成功!");
            return "true";
        }
    }
    
    消息手动确认

    1、yml 文件添加配置

    listener:
      simple:
        concurrency: 1
        max-concurrency: 1
        acknowledge-mode: manual
        prefetch: 1
    
    消费者代码修改
    /**
     * 简单模式
     * 消息消费者
     */
    @Component
    public class SimpleConsumer {
    
        @RabbitListener(queues = "my_queue")//监听的队列名称
        public void process(Message message, Channel channel) throws IOException {
            String str = new String(message.getBody());
            JSONObject msgData = (JSONObject) JSON.parse(str);
            Object messageId = msgData.get("messageId");
            if (null==messageId || messageId.toString().equals("")) {
                /**
                 * 有异常就拒收消息
                 * basicNack(long deliveryTag, boolean multiple, boolean requeue)
                 * deliveryTag:当前消息在队列中的的索引;
                 * multiple:为true的话就是批量确认
                 * requeue:true将消息重返当前消息队列,重新发送给消费者;
                 *         false将消息丢弃
                 */
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
                System.out.println("简单模式 -> 消费者拒收消息 : " + msgData.toString());
            }else {
                /**
                 * 没有异常就确认消息
                 * basicAck(long deliveryTag, boolean multiple)
                 * deliveryTag:当前消息在队列中的的索引;
                 * multiple:为true的话就是批量确认
                 */
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                System.out.println("简单模式 -> 消费者收到消息  : " + msgData.toString());
            }
        }
    }
    
    消息确认机制

    1、自动确认
    这也是默认的消息确认情况。AcknowledgeMode.NONE
    RabbitMQ 成功将消息发出(即将消息成功写入 TCP Socket)中立即认为本次投递已经被正确处理,不管消费者是否成功处理本次投递。
    所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。一般这种情况我们都是使用 try catch 捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。
    2、手动确认
    这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
    消费者收到消息后,手动调用 basic.ack / basic.nack / basic.reject 后,RabbitMQ 收到这些消息后,才认为本次投递成功。

    • basic.ack 用于肯定确认
    • basic.nack 用于否定确认(注意:这是 AMQP 0-9-1 的 RabbitMQ扩展)
      -basic.reject 用于否定确认,但与 basic.nack 相比有一个限制:一次只能拒绝单条消息。

    channel.basicReject(deliveryTag, true):拒绝消费当前消息
    第一个参数是当前消息在队列中的索引
    第二参数传入 true ,就是将数据重新丢会队列里,那么下次还会消费这个消息。
    第二参数传入 false,就是告诉服务器,我们已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行,下次不想再消费这条消息了。
    使用拒绝后重新入列这个确认模式要谨慎,因为一般都是出现异常的时候,catch 异常再拒绝入列,选择是否重新入列。如果使用不当会导致一些每次都被你重入列的消息一直消费 - 入列 - 消费 - 入列 这样循环,会导致消息积压。

    channel.basicNack(delivertTag, false, true):否定消费确认
    第一个参数:是当前消息在队列中的索引
    第二个参数:指是否针对多条消息,如果是 true,也就是说一次性针对当前通道的消息 tagID 小于当前这条消息的,都拒绝确认。
    第三个参数:指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。
    同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能考虑不周出现消息一直被重新丢回去的情况,导致积压。

    4、消息回调

    ConfirmCallback:当消息到达交换机触发回调
    ReturnsCallback:消息(带路由键 routingKey) 到达交换机,与交换机的所有绑定键进行匹配,触发回调。
    若要使用消息回调:

    1. 修改配置
    publisher-confirm-type: correlated
    publisher-returns: true
    
    1. 设置 mandatory
      ···
      设置rabbitTemplate的mandatory为true 或者在配置中设置 rabbitmq.template.mandatory=true
    配置文件新增代码:
    

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);
    //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
    rabbitTemplate.setMandatory(true);
    //确认消息发送到交换机
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    System.out.println("=======================> ConfirmCallback <=======================");
    System.out.println("ConfirmCallback ===>"+"相关数据:"+correlationData);
    System.out.println("ConfirmCallback ===>"+"确认情况:"+ack);
    System.out.println("ConfirmCallback ===>"+"原因:"+cause);
    }
    });
    //确认消息已发送到队列
    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
    Message message = returnedMessage.getMessage();
    int replyCode = returnedMessage.getReplyCode();
    String replyText = returnedMessage.getReplyText();
    String exchange = returnedMessage.getExchange();
    String routingKey = returnedMessage.getRoutingKey();
    System.out.println("=======================> ReturnsCallback <=======================");
    System.out.println("ReturnCallback ===>"+"消息:"+message.toString());
    System.out.println("ReturnCallback ===>"+"回应码:"+replyCode);
    System.out.println("ReturnCallback ===>"+"回应信息:"+replyText);
    System.out.println("ReturnCallback ===>"+"交换机:"+exchange);
    System.out.println("ReturnCallback ===>"+"路由键:"+routingKey);
    }
    });
    return rabbitTemplate;
    }

    ###### 工作模式
    运行许多消费者,任务在他们之间共享,但是一个消息只能被一个消费者获取。设置 prefetchCount 值为1。这告诉 RabbitMQ 一次不要向消费者发送多于一条消息。
    换句话说,不要向消费者发送新消息,直到它处理并确认了前一个消息。相反,它会将其分派给不是忙碌 的下一个消费者。
    当有多个消费者时,我们的消息会被哪个消费者消费呢?我们又该如何均衡消费者消费消息的多少呢?
    主要有两种模式:
    轮询模式的分发: 一个消费者一条,按均分配(关闭手动应答,开启自动应答)
    公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配(关闭自动应答,开启手动应答)
    1、 yml 文件添加 prefetch
    

    listener:
    simple:
    prefetch: 1

    2、创建两个消费者(生产者,配置类不变)
    

    /**

    • 简单模式

    • 消息消费者
      */
      @Component
      public class SimpleConsumer {

      @RabbitListener(queues = "my_queue")//监听的队列名称
      public void process(Message message, Channel channel) throws IOException, InterruptedException {
      String str = new String(message.getBody());
      JSONObject msgData = (JSONObject) JSON.parse(str);
      Object messageId = msgData.get("messageId");
      if (null==messageId || messageId.toString().equals("")) {
      /**
      * 有异常就拒收消息
      * basicNack(long deliveryTag, boolean multiple, boolean requeue)
      * deliveryTag:当前消息在队列中的的索引;
      * multiple:为true的话就是批量确认
      * requeue:true将消息重返当前消息队列,重新发送给消费者;
      * false将消息丢弃
      /
      channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
      System.out.println("简单模式 -> 消费者拒收消息 : " + msgData.toString());
      }else {
      /
      *
      * 没有异常就确认消息
      * basicAck(long deliveryTag, boolean multiple)
      * deliveryTag:当前消息在队列中的的索引;
      * multiple:为true的话就是批量确认
      */
      Thread.sleep(2000);
      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
      System.out.println("简单模式 process -> 消费者收到消息 : " + msgData.toString());
      }
      }

      @RabbitListener(queues = "my_queue")//监听的队列名称
      public void process2(Message message, Channel channel) throws IOException, InterruptedException {
      String str = new String(message.getBody());
      JSONObject msgData = (JSONObject) JSON.parse(str);
      Object messageId = msgData.get("messageId");
      if (null==messageId || messageId.toString().equals("")) {
      /**
      * 有异常就拒收消息
      * basicNack(long deliveryTag, boolean multiple, boolean requeue)
      * deliveryTag:当前消息在队列中的的索引;
      * multiple:为true的话就是批量确认
      * requeue:true将消息重返当前消息队列,重新发送给消费者;
      * false将消息丢弃
      /
      channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
      System.out.println("简单模式 -> 消费者拒收消息 : " + msgData.toString());
      }else {
      /
      *
      * 没有异常就确认消息
      * basicAck(long deliveryTag, boolean multiple)
      * deliveryTag:当前消息在队列中的的索引;
      * multiple:为true的话就是批量确认
      */
      Thread.sleep(5000);
      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
      System.out.println("简单模式 process2 -> 消费者收到消息 : " + msgData.toString());
      }
      }
      }

    
    #### 订阅模型-Fanout
    ###### 配置类
    

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;

    /**

    • 订阅模式-fanout

    • 扇型交换机

    • 创建三个队列 :fanout_queue1 fanout_queue2 fanout_queue3

    • 将三个队列都绑定在交换机 fanout_exchange 上

    • 因为是扇型交换机, 路由键无需配置,配置也不起作用
      */
      @Configuration
      public class FanoutRabbitMqConfig {
      //队列名称常量
      public static final String QUEUE_NAME1 = "fanout_queue1";
      public static final String QUEUE_NAME2 = "fanout_queue2";
      public static final String QUEUE_NAME3 = "fanout_queue3";
      //交换机名称常量
      public static final String EXCHANGE_NAME = "fanout_exchange";

      /**

      • 声明队列 Queue
        */
        @Bean
        public Queue fanoutQueue1() {
        return new Queue(QUEUE_NAME1, true, false, false, null);
        }

      /**

      • 声明队列 Queue
        */
        @Bean
        public Queue fanoutQueue2() {
        return new Queue(QUEUE_NAME2, true, false, false, null);
        }

      /**

      • 声明队列 Queue
        */
        @Bean
        public Queue fanoutQueue3() {
        return new Queue(QUEUE_NAME3, true, false, false, null);
        }

      /**

      • 声明交换机
        */
        @Bean
        FanoutExchange fanoutExchange() {
        return new FanoutExchange(EXCHANGE_NAME,true,false,null);
        }

      /**

      • 交换机队列绑定
        */
        @Bean
        Binding bindingExchange1() {
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
        }

      @Bean
      Binding bindingExchange2() {
      return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
      }

      @Bean
      Binding bindingExchange3() {
      return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
      }
      }

    ###### 生产者
    声明 Exchange,不再声明 Queue
    发送消息到 Exchange,不再发送到 Queue
    

    import com.alibaba.fastjson.JSONObject;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
    import java.text.SimpleDateFormat;
    import java.util.Date;

    @Controller
    @RequestMapping(value = "fanout")
    public class FanoutProducer {

    @Autowired
    RabbitTemplate rabbitTemplate;
    
    /**
     * 向rabbitMq发送消息
     */
    @RequestMapping(value = "sendMsg")
    @ResponseBody
    public String send(String messageId,String messageData) {
        SimpleDateFormat simpleDateFormat  = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String createTime = simpleDateFormat.format(new Date());
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("messageId",messageId);
        jsonObject.put("messageData",messageData);
        jsonObject.put("createTime",createTime);
        //将消息发送到队列my_queue中
        rabbitTemplate.convertAndSend("fanout_exchange",null, jsonObject.toString());
        System.out.println("rabbitMQ 简单模式消息发送成功!");
        return "true";
    }
    

    }

    ###### 消费者
    

    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import java.io.IOException;

    @Component
    public class FanoutReceiver {

    @RabbitListener(queues = "fanout_queue1")
    public void receive1(String msgData, Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        System.out.println("fanout_queue1消费者收到消息  : " +msgData);
    }
    
    @RabbitListener(queues = "fanout_queue2")
    public void receive2(String msgData, Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        System.out.println("fanout_queue2消费者收到消息  : " +msgData);
    }
    
    @RabbitListener(queues = "fanout_queue3")
    public void receive3(String msgData, Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        System.out.println("fanout_queue3消费者收到消息  : " +msgData);
    }
    

    }

    ###### 发送消息后台结果
    

    rabbitMQ 简单模式消息发送成功!
    fanout_queue3消费者收到消息 : {"createTime":"2022-10-06 23:02:29","messageId":"55","messageData":"bhhh"}
    fanout_queue1消费者收到消息 : {"createTime":"2022-10-06 23:02:29","messageId":"55","messageData":"bhhh"}
    fanout_queue2消费者收到消息 : {"createTime":"2022-10-06 23:02:29","messageId":"55","messageData":"bhhh"}
    rabbitMQ 简单模式消息发送成功!
    fanout_queue2消费者收到消息 : {"createTime":"2022-10-06 23:03:03","messageId":"66","messageData":"就立刻"}
    fanout_queue1消费者收到消息 : {"createTime":"2022-10-06 23:03:03","messageId":"66","messageData":"就立刻"}
    fanout_queue3消费者收到消息 : {"createTime":"2022-10-06 23:03:03","messageId":"66","messageData":"就立刻"}

    #### 订阅模型-Direct
    ###### 配置类
    

    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;

    /**

    • 订阅模式-direct

    • 直连交换机

    • 创建三个队列 :direct_queue1 direct_queue2 direct_queue3

    • 将三个队列都绑定在交换机 direct_exchange 上
      */
      @Configuration
      public class DirectRabbitMqConfig {
      //队列名称常量
      public static final String QUEUE_NAME1 = "direct_queue1";
      public static final String QUEUE_NAME2 = "direct_queue2";
      public static final String QUEUE_NAME3 = "direct_queue3";
      //交换机名称常量
      public static final String EXCHANGE_NAME = "direct_exchange";

      /**

      • 声明队列 Queue
        */
        @Bean
        public Queue directQueue1() {
        return new Queue(QUEUE_NAME1, true, false, false, null);
        }

      /**

      • 声明队列 Queue
        */
        @Bean
        public Queue directQueue2() {
        return new Queue(QUEUE_NAME2, true, false, false, null);
        }

      /**

      • 声明队列 Queue
        */
        @Bean
        public Queue directQueue3() {
        return new Queue(QUEUE_NAME3, true, false, false, null);
        }

      /**

      • 声明交换机
        */
        @Bean
        DirectExchange directExchange() {
        return new DirectExchange(EXCHANGE_NAME,true,false,null);
        }

      /**

      • 交换机队列绑定
        */
        @Bean
        Binding bindingDirectExchange1() {
        return BindingBuilder.bind(directQueue1()).to(directExchange()).with("info");
        }

      @Bean
      Binding bindingDirectExchange2() {
      return BindingBuilder.bind(directQueue2()).to(directExchange()).with("error");
      }

      @Bean
      Binding bindingDirectExchange3() {
      return BindingBuilder.bind(directQueue3()).to(directExchange()).with("warn");
      }
      }

    ###### 生产者
    

    import com.alibaba.fastjson.JSONObject;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
    import java.text.SimpleDateFormat;
    import java.util.Date;

    @Controller
    @RequestMapping(value = "direct")
    public class DirectProducer {

    @Autowired
    RabbitTemplate rabbitTemplate;
    
    /**
     * 向rabbitMq发送消息
     */
    @RequestMapping(value = "sendMsg")
    @ResponseBody
    public String send(String messageId,String messageData,String routingKey) {
        SimpleDateFormat simpleDateFormat  = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String createTime = simpleDateFormat.format(new Date());
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("messageId",messageId);
        jsonObject.put("messageData",messageData);
        jsonObject.put("createTime",createTime);
        //将消息发送到交换机
        rabbitTemplate.convertAndSend("direct_exchange",routingKey, jsonObject.toString());
        System.out.println("rabbitMQ 简单模式消息发送成功!");
        return "true";
    }
    

    }

    ###### 消费者
    

    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import java.io.IOException;

    @Component
    public class DirectReceiver {

    @RabbitListener(queues = "direct_queue1")
    public void receive1(String msgData, Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        System.out.println("direct_queue1消费者收到消息  : " +msgData);
    }
    
    @RabbitListener(queues = "direct_queue2")
    public void receive2(String msgData, Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        System.out.println("direct_queue2消费者收到消息  : " +msgData);
    }
    
    @RabbitListener(queues = "direct_queue3")
    public void receive3(String msgData, Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        System.out.println("direct_queue3消费者收到消息  : " +msgData);
    }
    

    }

    ###### 发送消息后台结果
    

    消息发送成功!路由键:warn
    direct_queue3消费者收到消息 : {"createTime":"2022-10-07 11:31:45","messageId":"1","messageData":"sacascsac"}
    消息发送成功!路由键:info
    direct_queue1消费者收到消息 : {"createTime":"2022-10-07 11:31:54","messageId":"1","messageData":"sacascsac"}
    消息发送成功!路由键:error
    direct_queue2消费者收到消息 : {"createTime":"2022-10-07 11:31:59","messageId":"1","messageData":"sacascsac"}
    消息发送成功!路由键:error1 (消息被丢弃)

    #### 订阅模型-Topic
    ###### 配置类
    

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;

    /**

    • 订阅模式-topic

    • 主题交换机

    • 创建三个队列 :topic_queue1 topic_queue2 topic_queue3

    • 将三个队列都绑定在交换机 topic_exchange 上
      */
      @Configuration
      public class TopicRabbitMqConfig {
      //队列名称常量
      public static final String QUEUE_NAME1 = "topic_queue1";
      public static final String QUEUE_NAME2 = "topic_queue2";
      public static final String QUEUE_NAME3 = "topic_queue3";
      //交换机名称常量
      public static final String EXCHANGE_NAME = "topic_exchange";

      /**

      • 声明队列 Queue
        */
        @Bean
        public Queue topicQueue1() {
        return new Queue(QUEUE_NAME1, true, false, false, null);
        }

      /**

      • 声明队列 Queue
        */
        @Bean
        public Queue topicQueue2() {
        return new Queue(QUEUE_NAME2, true, false, false, null);
        }

      /**

      • 声明队列 Queue
        */
        @Bean
        public Queue topicQueue3() {
        return new Queue(QUEUE_NAME3, true, false, false, null);
        }

      /**

      • 声明交换机
        */
        @Bean
        TopicExchange topicExchange() {
        return new TopicExchange(EXCHANGE_NAME,true,false,null);
        }

      /**

      • 交换机队列绑定
      • 路由键使用通配符
        */
        @Bean
        Binding bindingTopicExchange1() {
        //消息携带的路由键是以"topic."开头,就会分发到该队列
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.#");
        }

      @Bean
      Binding bindingTopicExchange2() {
      //消息携带的路由键是包含.topic.,就会分发到该队列
      return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(".topic.");
      }

      @Bean
      Binding bindingTopicExchange3() {
      //消息携带的路由键是以".topic"结尾,就会分发到该队列
      return BindingBuilder.bind(topicQueue3()).to(topicExchange()).with("#.topic");
      }
      }

    ###### 生产者
    

    import com.alibaba.fastjson.JSONObject;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;

    import java.text.SimpleDateFormat;
    import java.util.Date;

    @Controller
    @RequestMapping(value = "topic")
    public class TopicProducer {

    @Autowired
    RabbitTemplate rabbitTemplate;
    
    /**
     * 向rabbitMq发送消息
     */
    @RequestMapping(value = "sendMsg")
    @ResponseBody
    public String send(String messageId,String messageData,String routingKey) {
        SimpleDateFormat simpleDateFormat  = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String createTime = simpleDateFormat.format(new Date());
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("messageId",messageId);
        jsonObject.put("messageData",messageData);
        jsonObject.put("createTime",createTime);
        //将消息发送到队列my_queue中
        rabbitTemplate.convertAndSend("topic_exchange",routingKey, jsonObject.toString());
        System.out.println("消息发送成功!路由键:"+routingKey);
        return "true";
    }
    

    }

    ###### 消费者
    

    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import java.io.IOException;

    @Component
    public class TopicReceiver {

    @RabbitListener(queues = "topic_queue1")
    public void receive1(String msgData, Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        System.out.println("topic_queue1消费者收到消息  : " +msgData);
    }
    
    @RabbitListener(queues = "topic_queue2")
    public void receive2(String msgData, Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        System.out.println("topic_queue2消费者收到消息  : " +msgData);
    }
    
    @RabbitListener(queues = "topic_queue3")
    public void receive3(String msgData, Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        System.out.println("topic_queue3消费者收到消息  : " +msgData);
    }
    

    }

    ###### 发送消息后台结果
    

    消息发送成功!路由键:topic.one
    topic_queue1消费者收到消息 : {"createTime":"2022-10-07 12:05:17","messageId":"1","messageData":"sacascsac"}
    消息发送成功!路由键:A.topic.B
    topic_queue2消费者收到消息 : {"createTime":"2022-10-07 12:05:46","messageId":"1","messageData":"sacascsac"}
    消息发送成功!路由键:C.topic
    topic_queue3消费者收到消息 : {"createTime":"2022-10-07 12:06:02","messageId":"1","messageData":"sacascsac"}
    消息发送成功!路由键:SFC.topic.AFBGB
    topic_queue2消费者收到消息 : {"createTime":"2022-10-07 12:06:52","messageId":"1","messageData":"sacascsac"}
    消息发送成功!路由键:S.D.FC.topic.A.S.D.F (消息被丢弃)

    #### 一个简单的消息推送接收的流程
    ![微信截图_20230216160930.png](https://img.haomeiwen.com/i7399010/4a1c61239e36c5d2.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    
    生产者产生消息,将消息推送到中间方框里面也就是rabbitMq的服务器,然后经过服务器里面的交换机、队列等各种关系将数据处理入列后,最终由右边的消费者获取对应监听的消息进行消耗处理。
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    

    相关文章

      网友评论

          本文标题:RabbitMQ 使用教程

          本文链接:https://www.haomeiwen.com/subject/ranykdtx.html