美文网首页
RabbitMQ学习(八)与SpringCloudStream整

RabbitMQ学习(八)与SpringCloudStream整

作者: kobe0429 | 来源:发表于2018-11-30 14:05 被阅读0次

    一、Spring Cloud Stream介绍

    Spring Cloud Stream是一个建立在Spring Boot和Spring Integration之上的框架,有助于创建事件驱动或消息驱动的微服务。在本文中,我们将通过一些简单的例子来介绍Spring Cloud Stream的概念和构造。
    在 SpringBoot 的之中为了方便开发者去整合消息组件,也提供有一系列的处理支持,整合方式无非是在pom.xml中引入jar包,在配置文件定义好配置,然后通过@Autowired注解将RabbitTemplate引入,之后做业务处理即可。只是在 SpringCloud 里面将消息整合的处理操作进行了进一步的抽象操作, 实现了更加简化的消息处理。在整个 SpringCloud 之中支持有 RabbitMQ、Kafka 组件的消息系统。使得我们可以实现消息的生产端和消费端分别使用RabbitMQ、Kafka。

    rmq与kafka.JPG
    利用 SpringCloudStream 可以实现更加方便的消息系统的整合处理。SpringCloudStream 就是实现了 MDB 功能,同时可以更加简化方便的整合消息组件。

    二、Spring Cloud Stream与RMQ整合原理
    SpringCloudStream核心架构.JPG
    说明:最底层是消息服务,中间层是绑定层,绑定层和底层的消息服务进行绑定,顶层是消息生产者和消息消费者,顶层可以向绑定层生产消息和和获取消息消费
    • Barista接口:Barista接口是定义来作为后面类的参数,这一接口定义通道类型和通道名称,通道名称是作为配置用通道类型决定了app会使用这一通道进行发送消息还是从中接收消息
    • @Output: 输出注解,用于定义发送消息接口
    • @Input: 输入注解,用于定义消息的消费者接口
    • @StreamListener: 用于定义监听方法的注解
      使用Spring Cloud Stream 非常简单,只需要使用好3个注解即可,在实现高性能消息的生成和消费场景非常合适,但是使用Spring Cloud Stream框架有一个非常大的问题就是不能实现可靠性投递,也就是没法保证消息的100%可靠性,会存在少量消息丢失问题。
      这个原因是因为SpringCloudStream框架为了和Kafka兼顾所有在实际工作中使用它的目的就是针对高性能的消息通信的!

    三、Spring Cloud Stream与RMQ整合的代码实现

    因为Spring Cloud为微服务架构,在领域设计中一般将消息的生产者和消费者分开,此处我们也按2个微服务项目来讨论整合步骤。

    针对消息生产者:

    1、在pom.xml中添加maven依赖

    <dependency>
         <groupId>org.springframework.cloud</groupId>
             <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
             <version>1.3.4.RELEASE</version>
    </dependency>
    

    2、在application.properties或application.yml中添加自定义配置

    spring.cloud.stream.bindings.output_channel.destination=exchange-3
    spring.cloud.stream.bindings.output_channel.group=queue-3
    spring.cloud.stream.bindings.output_channel.binder=rabbit_cluster
    
    spring.cloud.stream.binders.rabbit_cluster.type=rabbit
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.11.76:5672
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
    

    3、创建Barista.java类

    package com.bfxy.rabbitmq.stream;
    
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    
    /**
     * <B>中文类名:</B><BR>
     * <B>概要说明:</B><BR>
     * 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
     * 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
     */
    public interface Barista {
        String OUTPUT_CHANNEL = "output_channel";  
    
        //注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题  
    
        //注解@Output声明了它是一个输出类型的通道,名字是output_channel。这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道,类型是output,发布的主题名为mydest。  
        @Output(Barista.OUTPUT_CHANNEL)
        MessageChannel logoutput();  
    
    //  String INPUT_BASE = "queue-1";  
    //  String OUTPUT_BASE = "queue-1";  
    //  @Input(Barista.INPUT_BASE)  
    //  SubscribableChannel input1();  
    //  MessageChannel output1();  
          
    }  
    

    4、创建生产者发送消息的类
    首先给类加上@EnableBinding(Barista.class),将我们之前创建的Barista类初始化,然后 @Autowired private Barista barista; 注入进来,最后写send方法的具体实现。

    package com.bfxy.rabbitmq.stream;
    
    import java.util.Map;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHeaders;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Service;
    
    @EnableBinding(Barista.class)
    @Service  
    public class RabbitmqSender {  
        @Autowired  
        private Barista barista;      
        // 发送消息
        public String sendMessage(Object message, Map<String, Object> properties) throws Exception {  
            try{
                MessageHeaders mhs = new MessageHeaders(properties);
                Message msg = MessageBuilder.createMessage(message, mhs);
                boolean sendStatus = barista.logoutput().send(msg);
                System.err.println("--------------sending -------------------");
                System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus);
            }catch (Exception e){  
                System.err.println("-------------error-------------");
                e.printStackTrace();
                throw new RuntimeException(e.getMessage());       
            }  
            return null;
        }      
    }  
    

    针对消息的消费端

    1、在pom.xml中添加maven依赖

    <dependency>
         <groupId>org.springframework.cloud</groupId>
             <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
             <version>1.3.4.RELEASE</version>
    </dependency>
    

    2、在application.properties或application.yml中添加自定义配置

    spring.cloud.stream.bindings.input_channel.destination=exchange-3
    spring.cloud.stream.bindings.input_channel.group=queue-3
    spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster
    spring.cloud.stream.bindings.input_channel.consumer.concurrency=1
    spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeue-rejected=false
    spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledge-mode=MANUAL
    spring.cloud.stream.rabbit.bindings.input_channel.consumer.recovery-interval=3000
    spring.cloud.stream.rabbit.bindings.input_channel.consumer.durable-subscription=true
    spring.cloud.stream.rabbit.bindings.input_channel.consumer.max-concurrency=5
    
    spring.cloud.stream.binders.rabbit_cluster.type=rabbit
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.11.76:5672
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
    

    消费端有个性化设置,配置比生产端要多
    3、创建Barista.java类

    package com.bfxy.rabbitmq.stream;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.messaging.SubscribableChannel;
    
    /**
     * <B>中文类名:</B><BR>
     * <B>概要说明:</B><BR>
     * 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
     * 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
     * @author ashen(Alienware)
     */
    
    public interface Barista {
          
        String INPUT_CHANNEL = "input_channel";  
    
        //注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题  
        @Input(Barista.INPUT_CHANNEL)  
        SubscribableChannel loginput();       
    }  
    

    4、创建消费者消费消息的类

    package com.bfxy.rabbitmq.stream;
    
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Service;
    
    import com.rabbitmq.client.Channel;
    
    
    @EnableBinding(Barista.class)
    @Service
    public class RabbitmqReceiver {  
    
        @StreamListener(Barista.INPUT_CHANNEL)  
        public void receiver(Message message) throws Exception {  
            Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
            Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            System.out.println("Input Stream 1 接受数据:" + message);
            System.out.println("消费完毕------------");
            channel.basicAck(deliveryTag, false);
        }  
    }  
    

    最后新建一个测试类,

    package com.bfxy.rabbitmq;
    
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.http.client.utils.DateUtils;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import com.bfxy.rabbitmq.stream.RabbitmqSender;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApplicationTests {
    
        @Autowired
        private RabbitmqSender rabbitmqSender;
        
        
        @Test
        public void sendMessageTest1() {
           for(int i = 0; i < 1; i ++){
               try {
                   Map<String, Object> properties = new HashMap<String, Object>();
                   properties.put("SERIAL_NUMBER", "12345");
                   properties.put("BANK_NUMBER", "abc");
                   properties.put("PLAT_SEND_TIME", DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"));
                   rabbitmqSender.sendMessage("Hello, I am amqp sender num :" + i, properties);
                  
               } catch (Exception e) {
                   System.out.println("--------error-------");
                   e.printStackTrace(); 
               }
           }
           //TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
        }
        
    }
    

    启动测试类,消费者和生产这款,查看生产者控制台发送数据成功,消费者控制台消费数据成功,rabbitmq管控台消息发送消费成功。


    生产者日志信息.JPG
    消费者日信息.JPG
    RabbitMQ管控台.JPG

    相关文章

      网友评论

          本文标题:RabbitMQ学习(八)与SpringCloudStream整

          本文链接:https://www.haomeiwen.com/subject/etgccqtx.html