美文网首页
SpringCloud Stream 集成 rabbitMQ

SpringCloud Stream 集成 rabbitMQ

作者: _micang | 来源:发表于2021-12-21 09:17 被阅读0次

配置文件

spring:
  rabbitmq:
    host: <ip>
    port: <port>
    username: <user-name>
    password: <pwd>
    virtual-host: <virtual-host>
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <ip>
                port: <port>
                username: <user-name>
                password: <pwd>
                virtual-host: <virtual-host>
      bindings:
        resourceRecycleOutput:
          destination: cloudlab.resrecycle
          default-binder: defaultRabbit
          group: gk8s
        resourceRecycleInput:
          destination: cloudlab.resrecycle
          default-binder: defaultRabbit
          group: gk8s
      rabbit:
        bindings:
          resourceRecycleOutput:
            producer:
              delayedExchange: true
          resourceRecycleInput:
            consumer:
              delayedExchange: true
              autoBindDlq: true
              republishToDlq: true
              requeueRejected: true

基础配置

@Configuration
public class MqConfig {    
    @Bean    
    public MessageConverter getMessageConverter() {        
        return new Jackson2JsonMessageConverter();    
    }
}

consumer消费者

channel
@Component
public interface ResourceRecycleInputChannelProcessor {    
    /**     
    * k8s资源回收输出     
    */    
    String RESOURCE_RECYCLE_INPUT = "resourceRecycleInput";   
    
    @Input(RESOURCE_RECYCLE_INPUT)    
    SubscribableChannel resourceRecycleInput();
}
listener队列监听
@Slf4j
@EnableBinding({ResourceRecycleInputChannelProcessor.class})
public class ResourceRecycleListener {
    /**
     * 系统k8s资源回收
     * 失败后重试3次, 可配置重试次数
     * @param message
     * @throws AmqpRejectAndDontRequeueException
     */
    @StreamListener(ResourceRecycleInputChannelProcessor.RESOURCE_RECYCLE_INPUT)            public void recycleResources(String message) throws AmqpRejectAndDontRequeueException{
}
listener/dlq死信队列监听
@Slf4j
@Component
public class ResourceRecycleDlqListener {
    private final String QUEUE_NAME = "cloudlab.resrecycle.gk8s.dlq";
    /**     
     * 死信消费     
     * @param message     
     */
    @RabbitListener(queues = QUEUE_NAME)
    public void consumer(String message)  {
    }
}

producer生产者

channel
@Component
public interface ResourceRecycleOutputChannelProcessor {    
    /**     
     * k8s资源回收消息输入     
     */    
    String RESOURCE_RECYCLE_OUTPUT = "resourceRecycleOutput";    
    @Output(RESOURCE_RECYCLE_OUTPUT)    
    MessageChannel resourceRecycleOutput();
}
sender
@Slf4j
@EnableBinding(value = {ResourceRecycleOutputChannelProcessor.class})
public class ResourceRecycleProducer extends BaseProducer implements ProducerService {    
    @Autowired    
    @Output(ResourceRecycleOutputChannelProcessor.RESOURCE_RECYCLE_OUTPUT)    
    private MessageChannel channel;    
    /**     
    * 消息发送     
    * @param message     
    */    
    @Override    
    public R<String> send(MessageModel message) {        
        try {            
            boolean result = channel.send(
                MessageBuilder                      
                    .withPayload(JSON.toJSONString(message.getBody()))                  
                    .setHeaders(
                        this.buildMessageHeader(                            
                            message.getHeaders(),                            
                            message.isEnableDelay(),                            
                            message.getDelay()                    
                        )
                    )                    
                    .build()            
            );            
            return R.ok();        
        } catch(Exception e) {            
            return R.fail();        
        }    
    }
}
public interface ProducerService {    
    /**     
     * 发送消息     
     *     
     * @param messge     
     * @return     
     */     
     R<String> send(MessageModel messge);
}
public abstract class BaseProducer {    
    @FunctionalInterface    
    interface DefFunction {        
        void exec(Map<String, Object> v1, Long v2);    
    }    
    /**     
     * 根据配置设置delay值     
     */    
    private Map<Boolean, DefFunction> setDelay = new HashMap<Boolean, DefFunction>(){{   
        put(true, (headers, delay) -> headers.put("x-delay", delay));        
        put(false, (headers, delay) -> headers.remove("x-delay"));    
    }};    
    /**     
     * 构建消息头部参数信息     
     *     
     * @param headers     
     * @return     
     */    
    protected MessageHeaderAccessor buildMessageHeader(Map<String, Object> headers, Boolean isDelay, Long delay) {        
        MessageHeaderAccessor msgHeaderAccessor = new MessageHeaderAccessor();        
        setDelay.get(isDelay).exec(headers, delay);        
        // 构造header,判断是否需要发送延时消息        
        Optional.ofNullable(headers).ifPresent(                
            hMap -> hMap.forEach((k, v) -> msgHeaderAccessor.setHeader(k, v))        
        );        
        return msgHeaderAccessor;    
    }
}
延时队列

rabbitmq_delayed_message_exchange 插件官方地址

相关文章

网友评论

      本文标题:SpringCloud Stream 集成 rabbitMQ

      本文链接:https://www.haomeiwen.com/subject/qdgtqrtx.html