美文网首页
Spring cloud stream 消费错误处理策略(一):

Spring cloud stream 消费错误处理策略(一):

作者: 纯是不纯 | 来源:发表于2021-07-01 15:39 被阅读0次

重试策略

重试策略就是消费失败后隔一段时间我在消费,这种方案一般是硬卧环境因素导致的失败情况,或者网络问题导致的消费失败,重试消费可能解决上述问题。

配置参数

重试策略是通过配置参数实现的,参数前缀是spring.cloud.strema.bingings.<bindingName>.consumer. ,具体配置参数如下:

maxAttempts: 3 #对输入通道消息处理的最大重试次数,默认是3 次。

backOffMaxInterval: 10000 #重试消息处理的最大时间间隔

backOffInitialInterval: 1000 #重试消息处理的初始间隔时间。

通过配置上述参数就能使使重试策略生效。

代码实践

```

package com.dy.producer;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.cloud.stream.annotation.EnableBinding;

import org.springframework.cloud.stream.annotation.Input;

import org.springframework.cloud.stream.annotation.Output;

import org.springframework.cloud.stream.annotation.StreamListener;

import org.springframework.integration.support.MessageBuilder;

import org.springframework.messaging.MessageChannel;

import org.springframework.messaging.SubscribableChannel;

import org.springframework.stereotype.Component;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RequestParam;

import org.springframework.web.bind.annotation.RestController;

@EnableBinding(TestApplication.TestTopic.class)

@SpringBootApplication

public class TestApplication {

private static final Loggerlog= LoggerFactory.getLogger(TestApplication.class);

    public static void main(String[] args) {

SpringApplication.run(TestApplication.class, args);

    }

@RestController

    static class TestController {

@Autowired

        private TestTopictestTopic;

        /**

        * 消息生产接口

        * @param message

        * @return

        */

        @GetMapping("/sendMessage")

public StringmessageWithMQ(@RequestParam String message) {

testTopic.output().send(MessageBuilder.withPayload(message).build());

            return "ok";

        }

}

/**

    * 消息消费逻辑

    */

    @Component

    static class TestListener {

@StreamListener(TestTopic.INPUT)

public void receive(String payload) {

log.info("Received: " + payload);

            throw new RuntimeException("Message consumer failed!");

        }

}

interface TestTopic {

StringOUTPUT ="example-topic-output";

        StringINPUT ="example-topic-input";

        @Output(OUTPUT)

MessageChanneloutput();

        @Input(INPUT)

SubscribableChannelinput();

    }

}

配置参数

相关文章

网友评论

      本文标题:Spring cloud stream 消费错误处理策略(一):

      本文链接:https://www.haomeiwen.com/subject/scvjultx.html