美文网首页JAVA
Spring cloud stream 消费错误处理策略(二):

Spring cloud stream 消费错误处理策略(二):

作者: 纯是不纯 | 来源:发表于2021-07-02 08:59 被阅读0次

自定义的错误处理

自定义错误一般是对于知道要发生什么错误,对于错误消息的一个处理方式,该方式适合在业务上有特殊处理的消息错误,比如错误数据保存到另一个topic或者只是记录到日志中等。自定义错误不需要配置参数,但需要增加@ServiceActivator注解 ,注解绑定错误通道
@ServiceActivator(inputChannel = "iris.test-19.errors")其中inputChannel是由destination+group+errors构成的,中间以点号隔开。

代码实践

package com.dy.producer;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {
    private static final Logger log= LoggerFactory.getLogger(TestApplication.class);
    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }
    @RestController
    static class TestController {
        @Autowired
        private TestTopic testTopic;
        /**
         * 消息生产接口
         * @param message
         * @return
         */
        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message).build());
            return "ok";
        }

    }
    /**
     * 消息消费逻辑
     */
    @Component
    static class TestListener {

        @StreamListener(TestTopic.INPUT)
        public void receive(String payload) {
            log.info("Received: " + payload);
            throw new RuntimeException("Message consumer failed!");
        }
        //destination+group+errors
        @ServiceActivator(inputChannel = "iris.test-19.errors")
        public void handleError(ErrorMessage errorMessage) {
            log.error("[handleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));
            log.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());
            log.error("[handleError][headers:{}]", errorMessage.getHeaders());
        }
        //全局错误处理
        @StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannel
        public void globalHandleError(ErrorMessage errorMessage) {
            log.error("[globalHandleError][payload:{}]"+ ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));
            log.error("[globalHandleError][originalMessage:{}]"+errorMessage.getOriginalMessage());
            log.error("[globalHandleError][headers:{}]"+errorMessage.getHeaders());
        }
    }
    interface TestTopic {
        String OUTPUT = "example-topic-output";
        String INPUT = "example-topic-input";
        @Output(OUTPUT)
        MessageChannel output();
        @Input(INPUT)
        SubscribableChannel input();
    }

}

具体配置请看我上篇写的配置参数一栏

相关文章

网友评论

    本文标题:Spring cloud stream 消费错误处理策略(二):

    本文链接:https://www.haomeiwen.com/subject/hxgjultx.html