重试策略
重试策略就是消费失败后隔一段时间我在消费,这种方案一般是硬卧环境因素导致的失败情况,或者网络问题导致的消费失败,重试消费可能解决上述问题。
配置参数
重试策略是通过配置参数实现的,参数前缀是spring.cloud.strema.bingings.<bindingName>.consumer. ,具体配置参数如下:
maxAttempts: 3 #对输入通道消息处理的最大重试次数,默认是3 次。
backOffMaxInterval: 10000 #重试消息处理的最大时间间隔
backOffInitialInterval: 1000 #重试消息处理的初始间隔时间。
通过配置上述参数就能使使重试策略生效。
代码实践
```
package com.dy.producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {
private static final Loggerlog= LoggerFactory.getLogger(TestApplication.class);
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
@RestController
static class TestController {
@Autowired
private TestTopictestTopic;
/**
* 消息生产接口
* @param message
* @return
*/
@GetMapping("/sendMessage")
public StringmessageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}
}
/**
* 消息消费逻辑
*/
@Component
static class TestListener {
@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received: " + payload);
throw new RuntimeException("Message consumer failed!");
}
}
interface TestTopic {
StringOUTPUT ="example-topic-output";
StringINPUT ="example-topic-input";
@Output(OUTPUT)
MessageChanneloutput();
@Input(INPUT)
SubscribableChannelinput();
}
}
网友评论