美文网首页
【JAVA】RabbitMQ

【JAVA】RabbitMQ

作者: Y了个J | 来源:发表于2019-03-24 21:04 被阅读0次
    RabbitMQ的工作原理
    屏幕快照 2019-03-24 下午8.14.22.png

    组成部分说明如下:

    Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
    Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
    Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
    Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
    Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
    

    消息发布接收流程:
    -----发送消息-----
    1、生产者和Broker建立TCP连接。
    2、生产者和Broker建立通道。
    3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
    4、Exchange将消息转发到指定的Queue(队列)

    ----接收消息-----
    1、消费者和Broker建立TCP连接
    2、消费者和Broker建立通道
    3、消费者监听指定的Queue(队列)
    4、当有消息到达Queue时Broker默认将消息推送给消费者。
    5、消费者接收到消息。

    发送端操作流程
    1)创建连接
    2)创建通道
    3)声明队列
    4)发送消息

    接收端
    1)创建连接
    2)创建通道
    3)声明队列
    4)监听队列
    5)接收消息
    6)ack回复

    工作模式
    RabbitMQ有以下几种工作模式 :
    1、Work queues
    2、Publish/Subscribe
    3、Routing
    4、Topics
    5、Header
    6、RPC

    屏幕快照 2019-03-24 下午8.35.17.png

    work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。 应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

    测试:
    1、使用入门程序,启动多个消费者。
    2、生产者发送多个消息。
    结果:
    1、一条消息只会被一个消费者接收;
    2、rabbit采用轮询的方式将消息是平均发送给消费者的;
    3、消费者在处理完某条消息后,才会收到下一条消息。

    屏幕快照 2019-03-24 下午8.37.18.png

    发布订阅模式:
    1、每个消费者监听自己的队列。
    2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

    publish/subscribe与work queues有什么区别。

    1)work queues不用定义交换机,而publish/subscribe需要定义交换机。
    2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认交换机)。
    3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实质上work queues会将队列绑定到默认的交换机 。
    相同点: 所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。

    实质工作用什么 publish/subscribe还是work queues。
    建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大,并且发布订阅模式可以指定自己专用的交换机。

    屏幕快照 2019-03-24 下午8.42.12.png

    路由模式:
    1、每个消费者监听自己的队列,并且设置routingkey。
    2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。

    Routing模式和Publish/subscibe有啥区别?

    Routing模式要求队列在绑定交换机时要指定routingkey,消息会转发到符合routingkey的队列。

    屏幕快照 2019-03-24 下午8.46.36.png

    Topics路由模式:
    1、每个消费者监听自己的队列,并且设置带统配符的routingkey。
    2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。

    SpringBoot整合RibbitMQ

    使用spring-boot-starter-amqp会自动添加spring-rabbit依赖,如下:

    <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring‐boot‐starter‐amqp</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring‐boot‐starter‐test</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring‐boot‐starter‐logging</artifactId>
       </dependency>
    

    配置application.yml

    server:
         port: 44000
       spring:
         application:
           name: test‐rabbitmq‐producer
         rabbitmq:
           host: 127.0.0.1
           port: 5672
           username: guest
           passowrd: guest
           virtualHost: /
    

    定义RabbitConfig类,配置Exchange、Queue、及绑定交换机。本例配置Topic交换机。

    @Configuration
    public class RabbitmqConfig {
        public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
        public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
        public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    
        @Bean(EXCHANGE_TOPICS_INFORM)
        public Exchange EXCHANGE_TOPICS_INFORM() {
          //durable(true)持久化,消息队列重启后交换机仍然存在
          return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
        }
    
          //声明队列
          @Bean(QUEUE_INFORM_SMS)
          public Queue QUEUE_INFORM_SMS() {
            Queue queue = new Queue(QUEUE_INFORM_SMS);
            return queue;
          }
    
        //声明队列
        @Bean(QUEUE_INFORM_EMAIL)
        public Queue QUEUE_INFORM_EMAIL() {
            Queue queue = new Queue(QUEUE_INFORM_EMAIL);
            return queue;
        }
    
        //channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#"); * 绑定队列到交换机 
        @Bean
        public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
        @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs();
        }
    
        @Bean
        public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
        @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs();
        }
    }
    

    生产端,使用RarbbitTemplate发送消息

       @SpringBootTest
       @RunWith(SpringRunner.class)
       public class Producer05_topics_springboot {
          @Autowired
           RabbitTemplate rabbitTemplate;
           @Test
           public void testSendByTopics(){
               for (int i=0;i<5;i++){
                   String message = "sms email inform to user"+I;
                   rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"inform.sms.email",message);
                   System.out.println("Send Message is:'" + message + "'");
                }
            }
        }
    

    消费端,使用@RabbitListener注解监听队列。

       @Component
       public class ReceiveHandler {
        //监听email队列
        @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
        public void receive_email(String msg,Message message,Channel channel){
               System.out.println(msg);
        }
    
        //监听sms队列
        @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})
        public void receive_sms(String msg,Message message,Channel channel){
               System.out.println(msg);
         }
    }
    

    测试


    屏幕快照 2019-03-24 下午9.03.46.png

    Mac安装rabbitmq
    brew install rabbitmq
    brew services list
    brew services start rabbitmq

    相关文章

      网友评论

          本文标题:【JAVA】RabbitMQ

          本文链接:https://www.haomeiwen.com/subject/mukpvqtx.html