美文网首页
SpringCloud整合RabbitMQ出现AmqpExcep

SpringCloud整合RabbitMQ出现AmqpExcep

作者: 尼小摩 | 来源:发表于2020-04-09 14:37 被阅读0次

    踩坑记录

    最近在使用SpringCloud架构一个推荐召回微服务,但是在集成RabbitMQ时就发现了无数个坑。于是总结了这篇文章供各位大侠围观和嘲笑

    SpringBoot集成RabbitMQ版本

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.0.3.RELEASE</version>
    </dependency>
    

    RabbitMQ

    因为公司业务方使用的RabbitMQ消息队列发送数据,作为消费方的我自然也就需要使用啦。猫了一眼spring boot的官方说明,上面说spring boot为rabbit准备了spring-boot-starter-amqp,并且为RabbitTemplate和RabbitMQ提供了自动配置选项。
    官方文档:https://docs.spring.io/spring-boot/docs/current-SNAPSHOT/reference/htmlsingle/#boot-features-rabbitmq 也有例子。

    开干~~。

    踩坑

    消费代码是这样的:

    /**
     * <p> 猜你喜欢 </p>
     *
     * @Author: fc.w
     * @Date: 2020/4/9 10:23
     */
    
    @Component
    @Slf4j
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${mq.queue}", autoDelete = "false"),
                    exchange = @Exchange(value = "${mq.exchageName}", delayed = "true", type = ExchangeTypes.TOPIC, arguments = @Argument(name = "x-delayed-type",value="topic")),
                    key = "${mq.routingKey}"
            )
    )
    public class ProductInventoryConsumer {
    
        @RabbitHandler
        public void process(String msg) {
            System.out.println("Info..........receiver: " + msg);
        }
    
    }
    
    

    MQ 配置信息

    mq.exchageName = cl.topic.delayed
    mq.routingKey = cl.product.updateCoreInfoModify
    mq.queue = mlp-product-guessInventoryQueue-delayed
    

    消费信息后,应该记录一条日志。
    结果得到只有org.springframework.amqp.AmqpException: No method found for class [B 这个异常,并且还无限循环抛出这个异常。。。

    2020-04-09 13:49:21.717 [SimpleAsyncTaskExecutor-1] [WARN ] o.s.a.r.l.ConditionalRejectingErrorHandler.log:88 []  - Execution of Rabbit message listener failed.
    org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1506)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1417)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:840)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:824)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:79)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1051)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: org.springframework.amqp.AmqpException: No method found for class [B
        at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:147)
        at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:250)
        at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:70)
        at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
        at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
        ... 8 common frames omitted
    

    官方文档解释了无限循环的原因:

    知道了为啥会无限重试了,下面来看看为啥会抛出这个异常.
    Issues上大神的解答: https://jira.spring.io/browse/AMQP-573

    进去看完问题和大神的解答,豁然开朗。

    There are two conversions in the @RabbitListener pipeline.
    The first converts from a Spring AMQP Message to a spring-messaging Message.
    There is currently no way to change the first converter from SimpleMessageConverter which handles String, Serializable and passes everything else as byte[].
    The second converter converts the message payload to the method parameter type (if necessary).
    With method-level @RabbitListeners there is a tight binding between the handler and the method.
    With class-level @RabbitListener s, the message payload from the first conversion is used to select which method to invoke. Only then, is the argument conversion attempted.
    This mechanism works fine with Java Serializable objects since the payload has already been converted before the method is selected.
    However, with JSON, the first conversion returns a byte[] and hence we find no matching @RabbitHandler.
    We need a mechanism such that the first converter is settable so that the payload is converted early enough in the pipeline to select the appropriate handler method.
    A ContentTypeDelegatingMessageConverter is probably most appropriate.
    And, as stated in AMQP-574, we need to clearly document the conversion needs for a @RabbitListener, especially when using JSON or a custom conversion.
    

    总之就是说这种类型推断只适用于方法级别的@RabbitListener。

    于是修改代码:

    @Component
    @Slf4j
    public class ProductInventoryConsumer {
    
        @RabbitHandler
        @RabbitListener(
                bindings = @QueueBinding(
                        value = @Queue(value = "${mq.queue}", autoDelete = "false"),
                        exchange = @Exchange(value = "${mq.exchageName}", delayed = "true", type = ExchangeTypes.TOPIC, arguments = @Argument(name = "x-delayed-type",value="topic")),
                        key = "${mq.routingKey}"
                )
        )
        public void process(String msg) {
            System.out.println("Info..........receiver: " + msg);
        }
    
    }
    

    项目启动正常,问题解决。。。。

    发消息测试

    消息打印:

    Info..........receiver: 123,13,10,34,112,105,100,34,58,32,34,84,82,45,71,89,45,69,88,67,69,76,76,69,78,67,69,124,49,34,44,13,10,34,112,105,100,115,34,58,91,34,65,80,45,66,88,74,71,45,66,80,90,83,124,55,34,93,44,13,10,34,116,121,112,101,34,58,32,34,85,112,115,101,114,116,80,114,111,100,117,99,116,34,44,13,10,34,100,97,116,97,34,58,32,123,13,10,32,32,32,32,34,111,110,115,97,108,101,34,58,32,116,114,117,101,44,13,10,32,32,32,32,34,105,115,115,104,111,119,34,58,32,116,114,117,101,13,10,32,32,32,32,32,125,13,10,125

    rabbitTemplate.convertAndSend发送的消息默认带有消息头


    而amqp-client发送的消息默认是不带消息头的

    消息订阅正常

    Info..........receiver: {
    "pid": "TR-GY-EXCELLENCE|1",
    "pids":["AP-BXJG-BPZS|7"],
    "type": "UpsertProduct",
    "data": {
    "onsale": true,
    "isshow": true
    }
    }

    相关文章

      网友评论

          本文标题:SpringCloud整合RabbitMQ出现AmqpExcep

          本文链接:https://www.haomeiwen.com/subject/sbhcmhtx.html