- Spring cloud stream 消费错误处理策略(二):
- Spring cloud stream 消费错误处理策略(三):
- Spring cloud stream 消费错误处理策略(一):
- Spring Cloud Stream介绍-Spring Clo
- Spring Cloud Stream消费失败后的处理策略(四)
- 30. Spring Cloud Alibaba之消息中间件 -
- Spring Cloud Alibaba之消息中间件 - Spr
- 08基于Stream的消息中间件绑定
- Spring Cloud Stream消费失败后的处理策略(二)
- 3.《kafka》SpringCloud学习之SpringClo
自定义的错误处理
自定义错误一般是对于知道要发生什么错误,对于错误消息的一个处理方式,该方式适合在业务上有特殊处理的消息错误,比如错误数据保存到另一个topic或者只是记录到日志中等。自定义错误不需要配置参数,但需要增加@ServiceActivator注解 ,注解绑定错误通道
@ServiceActivator(inputChannel = "iris.test-19.errors")其中inputChannel是由destination+group+errors构成的,中间以点号隔开。
代码实践
package com.dy.producer;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {
private static final Logger log= LoggerFactory.getLogger(TestApplication.class);
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
@RestController
static class TestController {
@Autowired
private TestTopic testTopic;
/**
* 消息生产接口
* @param message
* @return
*/
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}
}
/**
* 消息消费逻辑
*/
@Component
static class TestListener {
@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received: " + payload);
throw new RuntimeException("Message consumer failed!");
}
//destination+group+errors
@ServiceActivator(inputChannel = "iris.test-19.errors")
public void handleError(ErrorMessage errorMessage) {
log.error("[handleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));
log.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());
log.error("[handleError][headers:{}]", errorMessage.getHeaders());
}
//全局错误处理
@StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannel
public void globalHandleError(ErrorMessage errorMessage) {
log.error("[globalHandleError][payload:{}]"+ ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));
log.error("[globalHandleError][originalMessage:{}]"+errorMessage.getOriginalMessage());
log.error("[globalHandleError][headers:{}]"+errorMessage.getHeaders());
}
}
interface TestTopic {
String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";
@Output(OUTPUT)
MessageChannel output();
@Input(INPUT)
SubscribableChannel input();
}
}
具体配置请看我上篇写的配置参数一栏
网友评论