美文网首页
spring-cloud-stream 进阶篇

spring-cloud-stream 进阶篇

作者: 迷狮 | 来源:发表于2022-05-20 00:00 被阅读0次

    自定义SNS Binder

    依据官方文档中自定义binder的过程,结合idealo/spring-cloud-stream-binder-sns,实现SNS Binder的消息发送和接收

    整体逻辑图

    消息逻辑图

    1. Add the required dependencies

    添加maven依赖,包括stream、aws及integration

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>io.awspring.cloud</groupId>
        <artifactId>spring-cloud-starter-aws</artifactId>
    </dependency>
    <dependency>
        <groupId>io.awspring.cloud</groupId>
        <artifactId>spring-cloud-aws-messaging</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-aws</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-http</artifactId>
    </dependency>
    

    2. Provide a ProvisioningProvider implementation

    public class SnsStreamProvisioner implements ProvisioningProvider<ConsumerProperties, ProducerProperties> {
    
        private final DynamicTopicDestinationResolver destinationResolver;
    
        public SnsStreamProvisioner(AmazonSNS amazonSNS, ResourceIdResolver resourceIdResolver) {
            this.destinationResolver = new DynamicTopicDestinationResolver(amazonSNS, resourceIdResolver);
          
            this.destinationResolver.setAutoCreate(true);
        }
    
        @Override
        public ProducerDestination provisionProducerDestination(String name, ProducerProperties properties) throws ProvisioningException {
            return new SnsDestination(name, destinationResolver.resolveDestination(name));
        }
    
        @Override
        public ConsumerDestination provisionConsumerDestination(String name, String group, ConsumerProperties properties) throws ProvisioningException {
            return new SnsDestination(name, destinationResolver.resolveDestination(name));
        }
    }
    

    3. Provide a MessageProducer implementation

    public class SnsMessageProducer extends MessageProducerSupport {
    
        private final SnsDestination destination;
    
        public SnsMessageProducer(SnsDestination destination) {
            this.destination = destination;
        }
    
        public SnsDestination getDestination() {
            return destination;
        }
    
        @Override
        public void sendMessage(Message<?> messageArg) {
            super.sendMessage(messageArg);
        }
    }
    

    4. Provide a MessageHandler implementation

    5. Provide a Binder implementation

    public class SnsMessageBinder extends AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, SnsStreamProvisioner> {
        private final AmazonSNSAsync amazonSNS;
    
        public SnsMessageBinder(AmazonSNSAsync amazonSNS, SnsStreamProvisioner provisioningProvider) {
            super(new String[0], provisioningProvider);
            this.amazonSNS = amazonSNS;
        }
    
        @Override
        protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ProducerProperties producerProperties, MessageChannel errorChannel) throws Exception {
            SnsDestination snsDestination = (SnsDestination) destination;
            SnsMessageHandler snsMessageHandler = new SnsMessageHandler(this.amazonSNS);
            snsMessageHandler.setTopicArn(snsDestination.getArn());
            snsMessageHandler.setBeanFactory(super.getBeanFactory());
            snsMessageHandler.setFailureChannel(errorChannel);
            // 解析器设置
            final SpelExpressionParser spelExpressionParser = new SpelExpressionParser();
            snsMessageHandler.setSubjectExpression(spelExpressionParser.parseExpression("#{payload.bindingName}", ParserContext.TEMPLATE_EXPRESSION));
    //        snsMessageHandler.setBodyExpression(spelExpressionParser.parseExpression("#{payload.data}", ParserContext.TEMPLATE_EXPRESSION));
            return snsMessageHandler;
        }
    
        @Override
        protected MessageProducer createConsumerEndpoint(ConsumerDestination consumerDestination, String group, ConsumerProperties properties) throws Exception {
            final SnsMessageProducer snsMessageProducer = new SnsMessageProducer((SnsDestination) consumerDestination);
            snsMessageProducer.setBeanName("inbound." + consumerDestination.getName());
            snsMessageProducer.setBeanFactory(super.getBeanFactory());
            return snsMessageProducer;
        }
    
        @Override
        protected void postProcessOutputChannel(MessageChannel outputChannel, ProducerProperties producerProperties) {
            ((AbstractMessageChannel) outputChannel).addInterceptor(new SnsPayloadConvertingChannelInterceptor());
        }
    }
    

    6. Create a Binder Configuration

    @Configuration
    @ConditionalOnMissingBean(Binder.class)
    public class SnsBinderConfiguration {
    
        @Bean
        @ConditionalOnMissingBean
        public SnsStreamProvisioner snsStreamProvisioner(AmazonSNS amazonSNS, Optional<ResourceIdResolver> resourceIdResolver) {
            return new SnsStreamProvisioner(amazonSNS, resourceIdResolver.orElse(null));
        }
    
        @Bean
        @ConditionalOnMissingBean
        public CustomSnsInboundChannelAdapter snsInboundChannel(AmazonSNS amazonSNS,
                                                                @Value("${cloud.aws.sns.subscription.endpoint:}") String subscriptionPrefix) {
            return new CustomSnsInboundChannelAdapter(amazonSNS, subscriptionPrefix);
        }
    
        @Bean
        @ConditionalOnMissingBean
        public SnsMessageBinder snsMessageBinder(AmazonSNSAsync amazonSNS,
                                                 SnsStreamProvisioner snsStreamProvisioner,
                                                 CustomSnsInboundChannelAdapter inboundChannelAdapter) {
            final SnsMessageBinder snsMessageBinder = new SnsMessageBinder(amazonSNS, snsStreamProvisioner);
            snsMessageBinder.setConsumerEndpointCustomizer(new CustomSnsInboundCustomizer(inboundChannelAdapter));
            return snsMessageBinder;
        }
    }
    

    7. Define your binder in META-INF/spring.binders

    sns=com.kadadesign.stream.binder.sns.config.SnsBinderConfiguration
    

    关键内容解析

    发送消息内容转换

    使用StreamBridge发送消息时,系统会自动将要发送的内容转换为byte[],在通过snsMessageHandler发送消息到SNS前,需要将消息内容进行转换成string进行处理,见SnsMessageBinder#postProcessOutputChannel

    SnsPayloadConvertingChannelInterceptor如下所示:

    public class SnsPayloadConvertingChannelInterceptor implements ChannelInterceptor {
        @Override
        public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
            return MessageBuilder.createMessage(new String((byte[]) message.getPayload()), message.getHeaders());
        }
    }
    

    消息接收通道适配(InboundChannelAdapter)

    通过SNS订阅,SNS将消息推送给InboundChannelAdapter,InboundChannelAdapter需要处理消息,并将消息发送到对应的MessageProducer中。

    ConsumerEndpoint配置

    public class CustomSnsInboundCustomizer implements ConsumerEndpointCustomizer<SnsMessageProducer> {
    
        private final CustomSnsInboundChannelAdapter inboundChannelAdapter;
    
        public CustomSnsInboundCustomizer(CustomSnsInboundChannelAdapter inboundChannelAdapter) {
            this.inboundChannelAdapter = inboundChannelAdapter;
        }
       
        // 接收消息并发送到对应的SnsMessageProducer
        @Override
        protected void send(Object object) {
            // 1. 自定义消息处理
            // 2. 根据消息Subject获取SnsMessageProducer
            // 3. 发送消息
        }
    
        @Override
        public void configure(SnsMessageProducer endpoint, String destinationName, String group) {
            // 设置关系
            this.inboundChannelAdapter.addRouter(destinationName, group, endpoint);
        }
    }
    

    InboundChannelAdapter实现

    public class CustomSnsInboundChannelAdapter extends SnsInboundChannelAdapter implements ApplicationRunner{
        // 绑定producer
        public void addRouter(String destinationName, String group, SnsMessageProducer producer){// ...}
    
       // 订阅SNS
        @Override
        public void run(ApplicationArguments args) throws Exception {// ...}
    }
    

    参考资料

    相关文章

      网友评论

          本文标题:spring-cloud-stream 进阶篇

          本文链接:https://www.haomeiwen.com/subject/ejsxprtx.html