配置文件
spring:
rabbitmq:
host: <ip>
port: <port>
username: <user-name>
password: <pwd>
virtual-host: <virtual-host>
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: <ip>
port: <port>
username: <user-name>
password: <pwd>
virtual-host: <virtual-host>
bindings:
resourceRecycleOutput:
destination: cloudlab.resrecycle
default-binder: defaultRabbit
group: gk8s
resourceRecycleInput:
destination: cloudlab.resrecycle
default-binder: defaultRabbit
group: gk8s
rabbit:
bindings:
resourceRecycleOutput:
producer:
delayedExchange: true
resourceRecycleInput:
consumer:
delayedExchange: true
autoBindDlq: true
republishToDlq: true
requeueRejected: true
基础配置
@Configuration
public class MqConfig {
@Bean
public MessageConverter getMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
consumer消费者
channel
@Component
public interface ResourceRecycleInputChannelProcessor {
/**
* k8s资源回收输出
*/
String RESOURCE_RECYCLE_INPUT = "resourceRecycleInput";
@Input(RESOURCE_RECYCLE_INPUT)
SubscribableChannel resourceRecycleInput();
}
listener队列监听
@Slf4j
@EnableBinding({ResourceRecycleInputChannelProcessor.class})
public class ResourceRecycleListener {
/**
* 系统k8s资源回收
* 失败后重试3次, 可配置重试次数
* @param message
* @throws AmqpRejectAndDontRequeueException
*/
@StreamListener(ResourceRecycleInputChannelProcessor.RESOURCE_RECYCLE_INPUT) public void recycleResources(String message) throws AmqpRejectAndDontRequeueException{
}
listener/dlq死信队列监听
@Slf4j
@Component
public class ResourceRecycleDlqListener {
private final String QUEUE_NAME = "cloudlab.resrecycle.gk8s.dlq";
/**
* 死信消费
* @param message
*/
@RabbitListener(queues = QUEUE_NAME)
public void consumer(String message) {
}
}
producer生产者
channel
@Component
public interface ResourceRecycleOutputChannelProcessor {
/**
* k8s资源回收消息输入
*/
String RESOURCE_RECYCLE_OUTPUT = "resourceRecycleOutput";
@Output(RESOURCE_RECYCLE_OUTPUT)
MessageChannel resourceRecycleOutput();
}
sender
@Slf4j
@EnableBinding(value = {ResourceRecycleOutputChannelProcessor.class})
public class ResourceRecycleProducer extends BaseProducer implements ProducerService {
@Autowired
@Output(ResourceRecycleOutputChannelProcessor.RESOURCE_RECYCLE_OUTPUT)
private MessageChannel channel;
/**
* 消息发送
* @param message
*/
@Override
public R<String> send(MessageModel message) {
try {
boolean result = channel.send(
MessageBuilder
.withPayload(JSON.toJSONString(message.getBody()))
.setHeaders(
this.buildMessageHeader(
message.getHeaders(),
message.isEnableDelay(),
message.getDelay()
)
)
.build()
);
return R.ok();
} catch(Exception e) {
return R.fail();
}
}
}
public interface ProducerService {
/**
* 发送消息
*
* @param messge
* @return
*/
R<String> send(MessageModel messge);
}
public abstract class BaseProducer {
@FunctionalInterface
interface DefFunction {
void exec(Map<String, Object> v1, Long v2);
}
/**
* 根据配置设置delay值
*/
private Map<Boolean, DefFunction> setDelay = new HashMap<Boolean, DefFunction>(){{
put(true, (headers, delay) -> headers.put("x-delay", delay));
put(false, (headers, delay) -> headers.remove("x-delay"));
}};
/**
* 构建消息头部参数信息
*
* @param headers
* @return
*/
protected MessageHeaderAccessor buildMessageHeader(Map<String, Object> headers, Boolean isDelay, Long delay) {
MessageHeaderAccessor msgHeaderAccessor = new MessageHeaderAccessor();
setDelay.get(isDelay).exec(headers, delay);
// 构造header,判断是否需要发送延时消息
Optional.ofNullable(headers).ifPresent(
hMap -> hMap.forEach((k, v) -> msgHeaderAccessor.setHeader(k, v))
);
return msgHeaderAccessor;
}
}
网友评论