美文网首页
SpringCloud Stream 集成 rabbitMQ

SpringCloud Stream 集成 rabbitMQ

作者: _micang | 来源:发表于2021-12-21 09:17 被阅读0次

    配置文件

    spring:
      rabbitmq:
        host: <ip>
        port: <port>
        username: <user-name>
        password: <pwd>
        virtual-host: <virtual-host>
      cloud:
        stream:
          binders:
            defaultRabbit:
              type: rabbit
              environment:
                spring:
                  rabbitmq:
                    host: <ip>
                    port: <port>
                    username: <user-name>
                    password: <pwd>
                    virtual-host: <virtual-host>
          bindings:
            resourceRecycleOutput:
              destination: cloudlab.resrecycle
              default-binder: defaultRabbit
              group: gk8s
            resourceRecycleInput:
              destination: cloudlab.resrecycle
              default-binder: defaultRabbit
              group: gk8s
          rabbit:
            bindings:
              resourceRecycleOutput:
                producer:
                  delayedExchange: true
              resourceRecycleInput:
                consumer:
                  delayedExchange: true
                  autoBindDlq: true
                  republishToDlq: true
                  requeueRejected: true
    

    基础配置

    @Configuration
    public class MqConfig {    
        @Bean    
        public MessageConverter getMessageConverter() {        
            return new Jackson2JsonMessageConverter();    
        }
    }
    

    consumer消费者

    channel
    @Component
    public interface ResourceRecycleInputChannelProcessor {    
        /**     
        * k8s资源回收输出     
        */    
        String RESOURCE_RECYCLE_INPUT = "resourceRecycleInput";   
        
        @Input(RESOURCE_RECYCLE_INPUT)    
        SubscribableChannel resourceRecycleInput();
    }
    
    listener队列监听
    @Slf4j
    @EnableBinding({ResourceRecycleInputChannelProcessor.class})
    public class ResourceRecycleListener {
        /**
         * 系统k8s资源回收
         * 失败后重试3次, 可配置重试次数
         * @param message
         * @throws AmqpRejectAndDontRequeueException
         */
        @StreamListener(ResourceRecycleInputChannelProcessor.RESOURCE_RECYCLE_INPUT)            public void recycleResources(String message) throws AmqpRejectAndDontRequeueException{
    }
    
    listener/dlq死信队列监听
    @Slf4j
    @Component
    public class ResourceRecycleDlqListener {
        private final String QUEUE_NAME = "cloudlab.resrecycle.gk8s.dlq";
        /**     
         * 死信消费     
         * @param message     
         */
        @RabbitListener(queues = QUEUE_NAME)
        public void consumer(String message)  {
        }
    }
    

    producer生产者

    channel
    @Component
    public interface ResourceRecycleOutputChannelProcessor {    
        /**     
         * k8s资源回收消息输入     
         */    
        String RESOURCE_RECYCLE_OUTPUT = "resourceRecycleOutput";    
        @Output(RESOURCE_RECYCLE_OUTPUT)    
        MessageChannel resourceRecycleOutput();
    }
    
    sender
    @Slf4j
    @EnableBinding(value = {ResourceRecycleOutputChannelProcessor.class})
    public class ResourceRecycleProducer extends BaseProducer implements ProducerService {    
        @Autowired    
        @Output(ResourceRecycleOutputChannelProcessor.RESOURCE_RECYCLE_OUTPUT)    
        private MessageChannel channel;    
        /**     
        * 消息发送     
        * @param message     
        */    
        @Override    
        public R<String> send(MessageModel message) {        
            try {            
                boolean result = channel.send(
                    MessageBuilder                      
                        .withPayload(JSON.toJSONString(message.getBody()))                  
                        .setHeaders(
                            this.buildMessageHeader(                            
                                message.getHeaders(),                            
                                message.isEnableDelay(),                            
                                message.getDelay()                    
                            )
                        )                    
                        .build()            
                );            
                return R.ok();        
            } catch(Exception e) {            
                return R.fail();        
            }    
        }
    }
    
    public interface ProducerService {    
        /**     
         * 发送消息     
         *     
         * @param messge     
         * @return     
         */     
         R<String> send(MessageModel messge);
    }
    
    public abstract class BaseProducer {    
        @FunctionalInterface    
        interface DefFunction {        
            void exec(Map<String, Object> v1, Long v2);    
        }    
        /**     
         * 根据配置设置delay值     
         */    
        private Map<Boolean, DefFunction> setDelay = new HashMap<Boolean, DefFunction>(){{   
            put(true, (headers, delay) -> headers.put("x-delay", delay));        
            put(false, (headers, delay) -> headers.remove("x-delay"));    
        }};    
        /**     
         * 构建消息头部参数信息     
         *     
         * @param headers     
         * @return     
         */    
        protected MessageHeaderAccessor buildMessageHeader(Map<String, Object> headers, Boolean isDelay, Long delay) {        
            MessageHeaderAccessor msgHeaderAccessor = new MessageHeaderAccessor();        
            setDelay.get(isDelay).exec(headers, delay);        
            // 构造header,判断是否需要发送延时消息        
            Optional.ofNullable(headers).ifPresent(                
                hMap -> hMap.forEach((k, v) -> msgHeaderAccessor.setHeader(k, v))        
            );        
            return msgHeaderAccessor;    
        }
    }
    
    延时队列

    rabbitmq_delayed_message_exchange 插件官方地址

    相关文章

      网友评论

          本文标题:SpringCloud Stream 集成 rabbitMQ

          本文链接:https://www.haomeiwen.com/subject/qdgtqrtx.html