美文网首页
Spring cloud stream 消费错误处理策略(一):

Spring cloud stream 消费错误处理策略(一):

作者: 纯是不纯 | 来源:发表于2021-07-01 15:39 被阅读0次

    重试策略

    重试策略就是消费失败后隔一段时间我在消费,这种方案一般是硬卧环境因素导致的失败情况,或者网络问题导致的消费失败,重试消费可能解决上述问题。

    配置参数

    重试策略是通过配置参数实现的,参数前缀是spring.cloud.strema.bingings.<bindingName>.consumer. ,具体配置参数如下:

    maxAttempts: 3 #对输入通道消息处理的最大重试次数,默认是3 次。

    backOffMaxInterval: 10000 #重试消息处理的最大时间间隔

    backOffInitialInterval: 1000 #重试消息处理的初始间隔时间。

    通过配置上述参数就能使使重试策略生效。

    代码实践

    ```

    package com.dy.producer;

    import org.slf4j.Logger;

    import org.slf4j.LoggerFactory;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.boot.SpringApplication;

    import org.springframework.boot.autoconfigure.SpringBootApplication;

    import org.springframework.cloud.stream.annotation.EnableBinding;

    import org.springframework.cloud.stream.annotation.Input;

    import org.springframework.cloud.stream.annotation.Output;

    import org.springframework.cloud.stream.annotation.StreamListener;

    import org.springframework.integration.support.MessageBuilder;

    import org.springframework.messaging.MessageChannel;

    import org.springframework.messaging.SubscribableChannel;

    import org.springframework.stereotype.Component;

    import org.springframework.web.bind.annotation.GetMapping;

    import org.springframework.web.bind.annotation.RequestParam;

    import org.springframework.web.bind.annotation.RestController;

    @EnableBinding(TestApplication.TestTopic.class)

    @SpringBootApplication

    public class TestApplication {

    private static final Loggerlog= LoggerFactory.getLogger(TestApplication.class);

        public static void main(String[] args) {

    SpringApplication.run(TestApplication.class, args);

        }

    @RestController

        static class TestController {

    @Autowired

            private TestTopictestTopic;

            /**

            * 消息生产接口

            * @param message

            * @return

            */

            @GetMapping("/sendMessage")

    public StringmessageWithMQ(@RequestParam String message) {

    testTopic.output().send(MessageBuilder.withPayload(message).build());

                return "ok";

            }

    }

    /**

        * 消息消费逻辑

        */

        @Component

        static class TestListener {

    @StreamListener(TestTopic.INPUT)

    public void receive(String payload) {

    log.info("Received: " + payload);

                throw new RuntimeException("Message consumer failed!");

            }

    }

    interface TestTopic {

    StringOUTPUT ="example-topic-output";

            StringINPUT ="example-topic-input";

            @Output(OUTPUT)

    MessageChanneloutput();

            @Input(INPUT)

    SubscribableChannelinput();

        }

    }

    配置参数

    相关文章

      网友评论

          本文标题:Spring cloud stream 消费错误处理策略(一):

          本文链接:https://www.haomeiwen.com/subject/scvjultx.html