rabbitmq

作者: 78f6ced3a012 | 来源:发表于2019-06-07 22:34 被阅读0次

    先说一下安装rabbitmq
    1,安装opt_win64_22.0.exe (Erlang )
    2,安装rabbitmq
    3,计算机--管理--服务 找到rabbitmq服务右键属性,更改为当前登录人并填写密码,然后重启服务
    4,去安装目录C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin的路径的cmd窗口 执行:rabbitmq-plugins enable rabbitmq_management 打开管理窗口
    5,管理地址:http://localhost:15672 登录名:guest,密码:guest(注意:本地登录才有效,远程登录不行)

    概述:
    rabbitmq是一个消息队列,主要用于系统之间的异步和解耦,同时也能起到消息缓存,消息分发的作用。
    一般的消息队列有3个关键部分,生产者,队列,消费者;但是rabbitmq还有一个exchange(交换机);交换机的作用是:生产者把消息给交换机然后根据策略路由到队列上面去保存;交换机不能保存消息。
    rabbitmq还有一个虚拟主机的概念:其作用就是权限隔离,A虚拟机的交换机和队列跟B虚拟机的交换机和队列是不能互相访问(针对用户),就是用户仅有A虚拟机的权限就不能去访问B虚拟机里的交换机和队列。

    交换机四种模式(每个队列中的消息只能被消费一次):
    交换机的作用就是把消息路由到绑定的队列中去。
    direct:简单模式,一个发,另一个收
    topic:主题模式,比direct灵活
    fanout:广播模式,绑定的队列全部都能消费
    headers:设置 header attribute 参数类型的交换机(使用的少,后期再琢磨)
    特性:
    ack:应答机制,用于很重要的消息,必须确保送到且消费。默认是自动应答;如需手动应答需要自己设置配置文件。
    durable:持久化机制,把队列中的数据持久化到硬盘,避免rabbitmq服务器down机,而数据不会丢失。

    贴代码(springboot整合rabbitmq):
    properties配置:

    #rabbitmq配置
    spring.rabbitmq.addresses=127.0.0.1
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.port=5672
    
    # 开启ACK
    spring.rabbitmq.listener.direct.acknowledge-mode=manual
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    

    direct模式(不用配置交换机,有默认的)
    配置队列:

    package com.example.rabbitmq1.config;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    /**
     * rabbitmq的默认模式:direct,最简单的队列模式,一个生产者,一个消费者,交换机也是默认的不用创建,路由key默认是队列名
     * 1个消息只能被消费一次,如果有多个消费者,rabbitmq服务器会自己做负载均衡,平均消费
     */
    @Configuration
    public class DirectConfig {
        public static final String queue_direct_a = "queue.direct.a";
        public static final String queue_direct_user = "queue.direct.user";
    
        @Bean(name = queue_direct_a)
        public Queue queue1(){
            return new Queue(queue_direct_a);
        }
    
        @Bean(name = queue_direct_user)
        public Queue queue2(){
            return new Queue(queue_direct_user);
        }
    }
    

    生产者:

    package com.example.rabbitmq1.producer;
    import com.example.rabbitmq1.config.DirectConfig;
    import com.example.rabbitmq1.model.User;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    /**
     * 生产者
     */
    @Component
    public class DirectProducer {
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        public void sendMsg(String info){
            String now = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);
            String msg = "时间: " + now + ", info: " + info;
            amqpTemplate.convertAndSend(DirectConfig.queue_direct_a, msg);
        }
    
        /**
         * mq之间传递Java对象(Java对象要实现序列化接口)
         * @param user
         */
        public void sendMsg(User user){
            amqpTemplate.convertAndSend(DirectConfig.queue_direct_user, user);
        }
    }
    

    消费者:

    package com.example.rabbitmq1.consumer;
    import com.example.rabbitmq1.config.DirectConfig;
    import com.example.rabbitmq1.model.User;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = DirectConfig.queue_direct_user)
    public class DirectConsumer3 {
    
        @RabbitHandler
        public void receive(User user){
            // 如果是String类型,接收类型就改为String
            System.out.println("-----------------------------user--------------------------");
            System.out.println(user);
        }
    }
    

    topic模式
    配置队列,交换机,互相绑定

    package com.example.rabbitmq1.config;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * topic模式相比direct模式就是灵活一点(在同一个队列中也是1个消息实体只能被消费1次)
     * 一个交换机可以绑定到多个队列上,也可以使用匹配模式来匹配符合规则的队列
     * 匹配模式:*代表一个元素,#代表0个或多个元素
     * 队列命名原则:aaa.bbbb.ccc   以.隔开
     */
    @Configuration
    public class TopicConfig {
    
        public static final String queue_topic_a = "queue.topicA";  //不支持queue.topic.a    三级模式(这个坑,我刚踩完)
    
        public static final String queue_topic_b = "queue.topicB";
    
        public static final String exchange_topic = "exchange_topic";
    
    
        @Bean(name = queue_topic_a)
        public Queue queue1(){
            return new Queue(queue_topic_a);
        }
    
        @Bean(name = queue_topic_b)
        public Queue queue2(){
            return new Queue(queue_topic_b);
        }
    
        @Bean(name = exchange_topic)
        public TopicExchange exchange(){
            return new TopicExchange(exchange_topic);
        }
    
        /**
         * 把队列绑定到交换机上面
         * with方法中的参数是路由key
         * 在使用的时候会根据交换和路由队列名来推送消息,
         * 如果队列名符合路由key的规则就会把消息推送到绑定的队列中去
         * @param queue
         * @param topicExchange
         * @return
         */
        @Bean
        public Binding bindingExchangeA(@Qualifier(queue_topic_a) Queue queue, TopicExchange topicExchange){
            return BindingBuilder.bind(queue).to(topicExchange).with(queue_topic_a);
        }
    
        @Bean
        public Binding bindingExchangeB(@Qualifier(queue_topic_b) Queue queue, TopicExchange topicExchange){
            return BindingBuilder.bind(queue).to(topicExchange).with("queue.*");
        }
    }
    

    生产者:

    package com.example.rabbitmq1.producer;
    import com.example.rabbitmq1.config.TopicConfig;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class TopicProducer {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        public void sendMsg(String text){
            amqpTemplate.convertAndSend(TopicConfig.exchange_topic, TopicConfig.queue_topic_b, text);
        }
    }
    

    消费者:

    package com.example.rabbitmq1.consumer;
    
    import com.example.rabbitmq1.config.TopicConfig;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class TopicConsumer {
    
        @RabbitListener(queues = TopicConfig.queue_topic_a)
        public void receiveA(String info){
            System.out.println("-----------------------队列A:" + info);
        }
    
        @RabbitListener(queues = TopicConfig.queue_topic_b)
        public void receiveB(String info){
            System.out.println("-----------------------队列B:" + info);
        }
    }
    

    fanout模式就是广播模式,和topic类似,把不同点说说

        @Bean
        public FanoutExchange exchange() {
            // 是fanout的交换机
            return new FanoutExchange(exchange_fanout);
        }
    
        @Bean
        public Binding bindingFanoutA(@Qualifier(queue_fanout_a) Queue queue, FanoutExchange fanoutExchange){
            // 没有路由key,(后面没有with)
            return BindingBuilder.bind(queue).to(fanoutExchange);
        }
    
        // 生产者,注意队列名为null
        public void sendMsg(String info){
    //        amqpTemplate.convertAndSend(FanoutConfig.exchange_fanout, info);   此用法无效
            amqpTemplate.convertAndSend(FanoutConfig.exchange_fanout, null, info);
        }
    

    应答机制(properties记得开启配置):

    package com.example.rabbitmq1.consumer;
    import com.example.rabbitmq1.config.FanoutConfig;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import java.io.IOException;
    
    @Component
    public class FanoutConsumer {
    
        @RabbitListener(queues = FanoutConfig.queue_fanout_a)
        public void receive1(String info, Channel channel, Message message){
            try {
                // 应答这条消息,如果没有应答,下次连接消息依然会收到
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                System.out.println("----------------------------队列A: "+info);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    
        @RabbitListener(queues = FanoutConfig.queue_fanout_b)
        public void receive2(String info, Channel channel, Message message){
            try {
                // 销毁这条消息(在队列中销毁)
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println("----------------------------队列B: "+info);
        }
    }
    
    

    持久化机制:默认是持久化
    看源码的构造方法:队列和交换机
    队列:

    public class Queue extends AbstractDeclarable {
        public static final String X_QUEUE_MASTER_LOCATOR = "x-queue-master-locator";
        private final String name;
        private final boolean durable;
        private final boolean exclusive;
        private final boolean autoDelete;
        private final Map<String, Object> arguments;
        private volatile String actualName;
    
        public Queue(String name) {
            this(name, true, false, false);  //看第2个参数默认true
        }
    
        public Queue(String name, boolean durable) {
            this(name, durable, false, false, (Map)null);
        }
    
        public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
            this(name, durable, exclusive, autoDelete, (Map)null);
        }
    }
    

    交换机:

    public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {
        private final String name;
        private final boolean durable;
        private final boolean autoDelete;
        private final Map<String, Object> arguments;
        private volatile boolean delayed;
        private boolean internal;
    
        public AbstractExchange(String name) {
            this(name, true, false);  //看第2个参数默认为true
        }
    
        public AbstractExchange(String name, boolean durable, boolean autoDelete) {
            this(name, durable, autoDelete, (Map)null);
        }
    }
    

    相关文章

      网友评论

          本文标题:rabbitmq

          本文链接:https://www.haomeiwen.com/subject/vlgvxctx.html