美文网首页
rabbitmq整合springboot

rabbitmq整合springboot

作者: 念䋛 | 来源:发表于2021-10-10 21:08 被阅读0次

    Springboot 版本为2.3.10.RELEASE
    消费端
    在整合springboot的时候,个人认为有两种方式来消息确认,一种是完全使用配置的方式,一种是部分使用配置的方式

    1. 完全使用springboot配置的方式
      application.yml
    spring:
      rabbitmq:
        host: 192.168.137.141
        port: 5672
        username: duoduo
        password: duoduo
        virtual-host: /duoduo
        #三种方式 SIMPLE() CORRELATED(执行ConfirmCallback) NONE(发送失败直接丢弃)
        publisher-confirm-type: correlated
        publisher-returns: true
        listener:
          type: direct #direct和simple
          direct:
            acknowledgeMode: manual #auto代表自动接收消息,manual手动确认消息
            prefetch: 1  #这个就是basicQos,同时处理多少条消息
            defaultRequeueRejected: true #消息拒绝是否重新入队
            retry:
              enabled: true
              maxAttempts: 3
    
    

    监听

    
    @Configuration
    public class RabbitMqConsumer {
        public static AtomicInteger count = new AtomicInteger (0);
        public static AtomicInteger count1 = new AtomicInteger (0);
    
        /**
         * @RabbitListener(queues = {"spirngboot_queue"})可以直接监听队列,前提是服务端已经创建了队列,交换机也绑定了队列
         * 也可以创建交换机 队列 比如注释掉的代码
         */
        //@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "spirngboot_queue", durable = "true"),
                //exchange = @Exchange(name = "spirngboot_queue_topic_exchange", durable = "true", type = "topic"),
                //key = "springboot.#"))
        @RabbitListener(queues = {"spirngboot_queue"})
        @RabbitHandler
        public void message(@Payload User user) throws IOException {
            //这里是完全使用springboot的注解的方式,并且 acknowledgeMode: auto 要配置成auto的方式,配置成manual消息不会确定接收,
            //但是这里有一个问题,如果使用了自定义序列化之后,配置成manual也可以正常接收
            //配置文件中的retry,如果在接收消息的时候发生了异常那么会重试3次,3次之后消息就会丢弃掉,虽然配置了defaultRequeueRejected为true
            //但是如果使用了自定义序列化之后,不管defaultRequeueRejected是否为true,消息拒绝之后会重新的放到队列中
            //本人对这一地方不太了解
            //实际生产中本人也不会经常使用纯配置的方式
     try {
        System.out.println (user.getName ());
    } catch (Exception e) {
        //todo 代码的回退,如果try里面操作了数据库,可以通过事务自动的回退,但是如果操作了redis,那需要对redis 的回退等操作
        throw new IOException (e.getMessage ());
    }
    
        }
        //配合@RabbitHandler注解实现了消息的序列化格式,这样可以直接传对象,而不用吧对象转为json字符串,而且Jackson2JsonMessageConverter序列化体积更小传输更快
        @Bean
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory (ConnectionFactory connectionFactory){
            SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory ();
            simpleRabbitListenerContainerFactory.setConnectionFactory (connectionFactory);
            simpleRabbitListenerContainerFactory.setMessageConverter (new Jackson2JsonMessageConverter ());
            return simpleRabbitListenerContainerFactory;
        }
    }
    
    

    纯配置的方式,在监听的代码中只是接收消息就可以,不用手动的basicAck来确认消息,或者basicNack拒绝消息,纯配置的方式,对异常的捕获,之后是要继续手动抛出异常,默认重试3次,之后肯定不会重新放到队列中,本人没看过源码,看表象是这样的.而且acknowledgeMode:
    Auto 才会接收消息
    如果配置了@RabbitHandler 修改了序列化机制,上面的配置还是需要改变
    本人对纯配置的方式理解不太透彻,希望大家给我意见.

    1. 使用手动的确认信息
    @Configuration
    public class RabbitMqConsumer {
        public static Map<String, AtomicInteger> map = new HashMap<> ();
    
        /**
         * @RabbitListener(queues = {"spirngboot_queue"})可以直接监听队列,前提是服务端已经创建了队列,交换机也绑定了队列
         * 也可以创建交换机 队列 比如注释掉的代码
         */
        //@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "spirngboot_queue", durable = "true"),
                //exchange = @Exchange(name = "spirngboot_queue_topic_exchange", durable = "true", type = "topic"),
                //key = "springboot.#"))
        @RabbitListener(queues = {"spirngboot_queue"})
        @RabbitHandler
        public void message( Channel channel,Message message) throws Exception {
            MessageProperties messageProperties = message.getMessageProperties ();
            if(null==map.get("spirngboot_queue"+messageProperties.getDeliveryTag ())){
                map.put ("spirngboot_queue" + messageProperties.getDeliveryTag (), new AtomicInteger ());
            }
            try {
                //todo 逻辑代码,这里要注意的是,执行这段代码是多线程的,要注意多线程安全
                System.out.println ("消息的消费"+new String(message.getBody ()));
                //模拟异常,如果是操作了sevice层的数据库,发生异常可以事务回滚,如果包含其他操作,需要在catch中将操作回退,因为是多线程,要考虑线程安全问题
                int i = 1 / 0;
                //如果消费成功,则确认消息
                channel.basicAck (messageProperties.getDeliveryTag (), false);
            } catch (Exception e) {
                //利用messageProperties.getDeliveryTag ()得到消息唯一的id,判断重复消息了几次,如果超过3次把最后的true改为false,将消息不重新放到队列中
                //因为可能是分布式系统,可以使用reids来判断消息消费了几次,
                //如果操过了3次,从redis中将messageProperties.getDeliveryTag ()删除即可
                if(map.get("spirngboot_queue"+messageProperties.getDeliveryTag ()).addAndGet (1)<3){//判断消息失败消费了几次,如果操过能容忍的最大次数后将消息丢弃,这里可以使用死心队列接收失败的消息
                    System.out.println ("消息消费失败,重新放到队列中,失败消费次数"+map.get("spirngboot_queue"+messageProperties.getDeliveryTag ()).get ());
                    channel.basicNack (messageProperties.getDeliveryTag (), false, true);
                }else{
                    System.out.println ("消息消费失败,超过3次,丢弃消息,可以放到死心队列中");
                    channel.basicNack (messageProperties.getDeliveryTag (), false, false);
                }
            }
        }
        //配合@RabbitHandler注解实现了消息的序列化格式,这样可以直接传对象,而不用吧对象转为json字符串,而且Jackson2JsonMessageConverter序列化体积更小传输更快
        @Bean
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory (ConnectionFactory connectionFactory){
            SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory ();
            simpleRabbitListenerContainerFactory.setConnectionFactory (connectionFactory);
            simpleRabbitListenerContainerFactory.setMessageConverter (new Jackson2JsonMessageConverter ());
            return simpleRabbitListenerContainerFactory;
        }
    }
    
    

    生产者 这里使用的springboot 2.5.0版本
    配置类

    
    /**
     * 保证消息的确认发送有三种方式,事务,Confirms和异步监听的方式
     * 1.使用监听就是下面rabbitTemplateListener方式
     * 2.Confirms方式为Web类的rabbitMq方法,Confirms可以统一确认和单调确认,实例中为统一确认
     * 3.还有一种方式就是事务的方式,由于效率很低,一般很少使用,这里没有做介绍
     * 效率上监听的效率要高于Confirms,实际生产上也是建议使用监听的方式
     */
    @Configuration
    public class RabbitMqConfiguration {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        /**
         * 下面是项目启动的时候创建了队列 交换机 并将队列和交换机绑定在一起
         * 定义交换机的名字
         */
        public static final String EXCHANGE_NAME = "spirngboot_queue_topic_exchange";
        /**
         * 定义队列的名字
         */
        public static final String QUEUE_NAME = "spirngboot_queue";
    
        /**
         * 声明交换机
         */
        @Bean("bootExchange")
        public Exchange bootExchange() {
            return ExchangeBuilder.topicExchange (EXCHANGE_NAME).durable (true).build ();
        }
    
        /**
         * 声明队列
         */
        @Bean("bootQueue")
        public Queue bootQueue() {
            //指定队列的同时也指定了队列的最大优先级,发送消息的时候也要指定消息的优先级,rabbitmq 的管理页面的queue会有一个pri的标识
            return QueueBuilder.durable (QUEUE_NAME).maxPriority (5).build ();
        }
    
        /**
         * 队列与交换机进行绑定
         */
        @Bean
        public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
            return BindingBuilder.bind (queue).to (exchange).with ("springboot.#").noargs ();
        }
    
        @PostConstruct
        public void rabbitTemplateListener() {
            //设置confirmeCallback 需要在配置文件中加publisher-confirm-type: correlated
            rabbitTemplate.setConfirmCallback (new RabbitTemplate.ConfirmCallback () {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    //ack 为  true表示 消息已经到达交换机,此时消息并没有到达队列
                    System.out.println (correlationData);
                    if (ack) {
                        //交换价接收消息成功 cause为null
                        System.out.println ("交换机接收成功消息");
                    } else {
                        //接收失败
                        System.out.println ("交换机接收失败消息" + cause);
                        //做一些处理,让消息再次发送。
                    }
                }
            });
            //启动return机制的两种方式
            /**1.配置文件中 publisher-returns: true
             * 2.rabbitTemplate.setMandatory (true);
             */
            //定义回调,交换机是否到达队列,发生在ack成功之后
            rabbitTemplate.setReturnsCallback (new RabbitTemplate.ReturnsCallback () {
                @Override
                public void returnedMessage(ReturnedMessage returned) {
                    //获取交换机
                    System.out.println ("获取交换机" + returned.getExchange ());
                    //获取消息对象
                    System.out.println ("获取消息对象" + returned.getMessage ());
                    //获取错误码
                    System.out.println ("获取错误码" + returned.getReplyCode ());
                    //获取错误信息
                    System.out.println ("获取错误信息" + returned.getReplyText ());
                    //获取路由key
                    System.out.println ("获取路由key" + returned.getRoutingKey ());
                }
            });
        }
    }
    
    

    发送消息

    @Controller
    @ResponseBody
    public class Web {
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @RequestMapping("rabbitMq")
        public void rabbitMq() {
            //直接发送消息
    //        rabbitTemplate.convertAndSend ("spirngboot_queue_topic_exchange", "springboot1.HelloRabbitMq", "HelloRabbitMq" );
            //使用了Jackson2JsonMessageConverter序列化,发送消息可以直接发送对象,而不是字符串
    //        rabbitTemplate.setMessageConverter (new Jackson2JsonMessageConverter ());
            //将消息设置优先级
            Boolean invoke = rabbitTemplate.invoke ((ops) -> {
                for (int i = 0; i < 2; i++) {
                    User user = new User ("zhangsan"+i);
                    //偶数为优先级高的消息,接收消息的时候会发现偶数的消息比奇数的消息更早的被消费
                    if (i % 2 == 0) {
                        ops.convertAndSend ("spirngboot_queue_topic_exchange", "springboot.HelloRabbitMq", user.toString (),
                                message -> {
                                    //设置优先级为5的消息,0最小 255最大
                                    message.getMessageProperties ().setPriority (5);
                                    return message;
                                });
                    } else {
                        ops.convertAndSend ("spirngboot_queue_topic_exchange", "springboot.HelloRabbitMq", user.toString (),
                                message -> {
                                    message.getMessageProperties ().setPriority (0);
                                    return message;
                                });
                    }
                }
                //如果2000毫秒之内没有收到服务端的确认消息,下面的invoke为false,此方法为阻塞方法
                return rabbitTemplate.waitForConfirms (2000);
            });
            if (invoke) {
                System.out.println ("消息发送成功-----");
            } else {
                System.out.println ("消息发送失败-----");
            }
        }
    }
    
    

    相关文章

      网友评论

          本文标题:rabbitmq整合springboot

          本文链接:https://www.haomeiwen.com/subject/oihkoltx.html