美文网首页
4.RabbitMq消息队列

4.RabbitMq消息队列

作者: Junma_c631 | 来源:发表于2020-10-28 09:37 被阅读0次

    一、利用docker安装rabbitmq

    https://www.cnblogs.com/yufeng218/p/9452621.html
    rabbitmq技术文档:https://www.jianshu.com/p/78847c203b76
    Broker:消息系统
    Virtual host(虚拟地址,可以理解为命名空间)
    Connection:(TCP生产者或者消费端和消息系统建立的长连接)
    Channel:每一个连接里面可以划分出很多渠道,一个消息的发送通过一个渠道发送到交换机。
    Exchange:交换机(根据路由健把消息分发到不同的队列)
    Queue:队列,存储消息的队列
    Binding:绑定器,绑定交换机的路由健和队列。

    image.png

    路由键有四种:

    direct :

    这种类型的交换机的路由规则是根据一个routingKey的标识,交换机通过一个routingKey与队列绑定 ,在生
    产者生产消息的时候 指定一个routingKey 当绑定的队列的routingKey 与生产者发送的一样 那么交换机会吧
    这个消息发送给对应的队列。

    image.png

    fanout:

    这种类型的交换机路由规则很简单,只要与他绑定了的队列, 他就会吧消息发送给对应队列(与routingKey
    没关系)


    image.png

    topic:

    这种类型的交换机路由规则也是和routingKey有关 只不过 topic他可以根据:*,#( 号代表过滤一单词,#代
    表过滤后面所有单词, 用.隔开)来识别routingKey 我打个比方 假设 我绑定的routingKey 有队列A和B A的
    routingKey是:
    .user B的routingKey是: #.user
    那么我生产一条消息routingKey 为: error.user 那么此时 2个队列都能接受到, 如果改为 topic.error.user
    那么这时候 只有B能接受到了

    image.png
    image.png

    发布方消息确认机制和失败回调机制

    image.png
    application.yml
    rabbitmq:
        host: 192.168.29.133
        port: 5672
        username: admin
        password: admin
        virtual-host: my_vhost
        publisher-confirms: true
        publisher-returns: true
    
    package com.luban.mall.search.mq;
    
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    @Configuration
    public class RabbitmqConfig {
        @Autowired
        private ConnectionFactory connectionFactory;
        @Bean
        public DirectExchange directExchange() {
            return new DirectExchange("directExchange");
        }
        @Bean
        public DirectExchange directExchange2() {
            return new DirectExchange("directExchange2");
        }
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
        @Bean
        public TopicExchange topicExchange(){
            return new TopicExchange("topicExchange");
        }
        @Bean
        public Queue queue1() {
            return new Queue("testQueue1", true);
        }
        @Bean
        public Queue queue2() {
            return new Queue("testQueue2", true);
        }
        @Bean
        public Queue queue3() {
            return new Queue("testQueue3", true);
        }
        @Bean
        public Queue queue4() {
            return new Queue("testQueue4", true);
        }
        @Bean
        public Queue queue5() {
            return new Queue("testQueue5", true);
        }
        @Bean
        public Queue queue6() {
            return new Queue("testQueue6", true);
        }
        @Bean
        public Binding binding1(){
            return BindingBuilder.bind(queue1()).to(directExchange()).with("rkey1");
        }
        @Bean
        public Binding binding2(){
            return BindingBuilder.bind(queue2()).to(directExchange()).with("rkey2");
        }
        @Bean
        public Binding binding3(){
            return BindingBuilder.bind(queue3()).to(fanoutExchange());
        }
        @Bean
        public Binding binding4(){
            return BindingBuilder.bind(queue4()).to(fanoutExchange());
        }
        @Bean
        public Binding binding5(){
            return BindingBuilder.bind(queue5()).to(topicExchange()).with("*.user");
        }
        @Bean
        public Binding binding6(){
            return BindingBuilder.bind(queue6()).to(topicExchange()).with("#.user");
        }
        @Bean
        public RabbitTemplate rabbitTemplate(){
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMandatory(true);
            template.setReturnCallback(new MyReturnCallback());
            template.setConfirmCallback(new MyPublisherConfirmCallback());
            return template;
        }
    }
    
    
    package com.luban.mall.search.mq;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    public class MyPublisherConfirmCallback implements RabbitTemplate.ConfirmCallback {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            //通过这个参数可以拿到我们发送的消息对象
            Message message = correlationData.getReturnedMessage();
            if(ack){
                System.out.println("消息发送成功:"+correlationData+"---ack:"+ack+"---cause:"+cause);
            }else{
                System.out.println("消息发送失败:"+correlationData+"---ack:"+ack+"---cause:"+cause);
            }
        }
    }
    
    public class MyReturnCallback implements RabbitTemplate.ReturnCallback {
        @Override
        public void returnedMessage(Message message, int replycode, String replyText, String exchange, String routingkey) {
            System.out.println("error return call back: message"+message+"--replycode:"+replycode+"-- replyText:"+replyText+"--exchange:"+exchange+"---routingkey:"+routingkey);
        }
    }
    
    @RequestMapping(value = "/send5",method = RequestMethod.POST)
        @ApiOperation("发送消息到消息队列")
        @ResponseBody
        @ApiImplicitParams({
                @ApiImplicitParam(name = "name", value = "用户名", defaultValue = "李四",paramType = "query"),
                @ApiImplicitParam(name = "age", value = "年龄", defaultValue = "23", required = true,paramType = "query")
        }
        )
        public CommonResult<String> send5(@RequestParam String name,@RequestParam String age){
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(name,age,"majunweitest callback",correlationData);
            return CommonResult.success("马","返回正确");
        }
    

    消费方ack机制

    image.png

    代码实现

    package com.luban.mall.search.mq;
    
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MyRabbitListenerContainerConif {
        @Bean
        public SimpleRabbitListenerContainerFactory
        simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory){
         SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory =
        new SimpleRabbitListenerContainerFactory();
       //这个connectionFactory就是我们自己配置的连接工厂直接注入进来
      simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
      //这边设置消息确认方式由自动确认变为手动确认
      simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
      return simpleRabbitListenerContainerFactory;
        }
    
    }
    
    package com.luban.mall.search.controller;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class TestListener {
        //@RabbitListener(queues = {"testQueue5","testQueue6"})
        public void reciveMessage(String msg){
            System.out.println("listener....:"+msg);
        }
        @RabbitListener(queues = {"testQueue5","testQueue6"},containerFactory = "simpleRabbitListenerContainerFactory")
        public void getMessage(Message message, Channel channel) throws Exception{
            System.out.println("list manul ack:"+(new String(message.getBody(),"UTF-8")));
            Thread.sleep(5000l);
            //消息消费成功后调用第一个参数是消息的标识字段,第二个是否是批量确认:false不是批量,true是批量
           // channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            //消费失败后调用这个方法告知消息队列最后一个参数:是否返回原队列
           channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
        }
    }
    

    消息预取

    消息预取
    扯完消息确认 我们来讲一下刚刚所说的批量处理的问题
    什么情况下回遇到批量处理的问题呢?
    在这里 就要先扯一下rabbitmq的消息发放机制了
    rabbitmq 默认 他会最快 以轮询的机制吧队列所有的消息发送给所有客户端 (如果消息没确认的话 他会添加一个
    Unacked的标识上图已经看过了)
    那么 这种机制会有什么问题呢, 对于Rabbitmq来讲 这样子能最快速的使自己不会囤积消息而对性能造成影响,
    但是 对于我们整个系统来讲, 这种机制会带来很多问题, 比如说 我一个队列有2个人同时在消费,而且他们处理
    能力不同, 我打个最简单的比方 有100个订单消息需要处理(消费) 现在有消费者A 和消费者B , 消费者A消费一
    条消息的速度是 10ms 消费者B 消费一条消息的速度是15ms ( 当然 这里只是打比方) 那么 rabbitmq 会默认给
    消费者A B 一人50条消息让他们消费 但是 消费者A 他500ms 就可以消费完所有的消息 并且处于空闲状态 而 消费
    }
    // channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    者B需要750ms 才能消费完 如果从性能上来考虑的话 这100条消息消费完的时间一共是750ms(因为2个人同时在
    消费) 但是如果 在消费者A消费完的时候 能把这个空闲的性能用来和B一起消费剩下的信息的话, 那么这处理速
    度就会快非常多。
    这个例子可能有点抽象, 我们通过代码来演示一下
    我往Rabbitmq生产100条消息 由2个消费者来消费 其中我们让一个消费者在消费的时候休眠0.5秒(模拟处理业务
    的延迟) 另外一个消费者正常消费 我们来看看效果:
    正常的那个消费者会一瞬间吧所有消息(50条)全部消费完(因为我们计算机处理速度非常快) 下图是加了延迟
    的消费者:
    可能我笔记里面你看不出效果,这个你自己测试就会发现 其中一个消费者很快就处理完自己的消息了 另外一个消
    费者还在慢慢的处理 其实 这样严重影响了我们的性能了。
    其实讲了这么多 那如何来解决这个问题呢?
    我刚刚解释过了 造成这个原因的根本就是rabbitmq消息的发放机制导致的, 那么我们现在来讲一下解决方案: 消
    息预取
    什么是消息预取? 讲白了以前是rabbitmq一股脑吧所有消息都均发给所有的消费者(不管你受不受得了) 而现在
    是在我消费者消费之前 先告诉rabbitmq 我一次能消费多少数据 等我消费完了之后告诉rabbitmq rabbitmq再给
    我发送数据
    在代码中如何体现?
    在使用消息预取前 要注意一定要设置为手动确认消息, 原因参考上面划重点的那句话。
    因为我们刚刚设置过了 这里就不贴代码了, 完了之后设置一下我们预取消息的数量 一样 是在容器(Container)
    里面设置:

    @Configuration
    public class MyRabbitListenerContainerConif {
        @Bean
        public SimpleRabbitListenerContainerFactory
        simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory =
                    new SimpleRabbitListenerContainerFactory();
            //这个connectionFactory就是我们自己配置的连接工厂直接注入进来
            simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
            //这边设置消息确认方式由自动确认变为手动确认
            simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            //设置消息预取得数量
            simpleRabbitListenerContainerFactory.setPrefetchCount(2);
            return simpleRabbitListenerContainerFactory;
        }
    
    }
    
    
    @Component
    public class TestListener {
        //@RabbitListener(queues = {"testQueue5","testQueue6"})
        public void reciveMessage(String msg){
            System.out.println("listener....:"+msg);
        }
        @RabbitListener(queues = {"testQueue5","testQueue6"},containerFactory = "simpleRabbitListenerContainerFactory")
        public void getMessage(Message message, Channel channel) throws Exception{
            System.out.println("list manul ack:"+(new String(message.getBody(),"UTF-8")));
            Thread.sleep(5000l);
            //消息消费成功后调用第一个参数是消息的标识字段,第二个是否是批量确认:false不是批量,true是批量
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
            //消费失败后调用这个方法告知消息队列最后一个参数:是否返回原队列
           //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
        }
    }
    
    

    死信交换机

    image.png
    package com.luban.mall.search.mq;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class RabbitmqConfig {
        @Autowired
        private ConnectionFactory connectionFactory;
        
        @Bean
        public DirectExchange deathExchange() {
            return new DirectExchange("deathExchange");
        }
       
        @Bean
        public Queue queue6() {
            Map<String, Object> map = new HashMap();
            //设置消息的过期时间 单位毫秒
            map.put("x-message-ttl", 100000);
            //设置附带的死信交换机
            map.put("x-dead-letter-exchange", "deathExchange");
            //指定重定向的路由建 消息作废之后可以决定需不需要更改他的路由建 如果需要 就在这里指定
            map.put("x-dead-letter-routing-key", "drkey1");
            return new Queue("testQueue6", true,false,false,map);
        }
        @Bean
        public Queue queue7() {
            return new Queue("deathQueue", true);
        }
        @Bean
        public Binding binding7(){
            return BindingBuilder.bind(queue7()).to(deathExchange()).with("drkey1");
        }
    
        @Bean
        public Binding binding6(){
            return BindingBuilder.bind(queue6()).to(topicExchange()).with("#.user");
        }
        @Bean
        public RabbitTemplate rabbitTemplate(){
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMandatory(true);
            template.setReturnCallback(new MyReturnCallback());
            template.setConfirmCallback(new MyPublisherConfirmCallback());
            return template;
        }
    }
    

    怎么保证消息不丢失、不重复消费问题

    https://www.cnblogs.com/jis121/p/11050202.html

    rabbitmq集群搭建,配置

    rabbbitmq由于是由erlang语言开发的 天生就支持分布式
    rabbitmq 的集群分两种模式 一种是默认模式 一种是镜像模式
    当然 所谓的镜像模式是基于默认模式加上一定的配置来的
    在rabbitmq集群当中 所有的节点(一个rabbitmq服务器) 会被归为两类 一类是磁盘节点 一类是内存节点
    磁盘节点会把集群的所有信息(比如交换机,队列等信息)持久化到磁盘当中,而内存节点只会将这些信息保存到内存
    当中 讲白了 重启一遍就没了。
    为了可用性考虑 rabbitmq官方强调集群环境至少需要有一个磁盘节点, 而且为了高可用的话, 必须至少要有2个
    磁盘节点, 因为如果只有一个磁盘节点 而刚好这唯一的磁盘节点宕机了的话, 集群虽然还是可以运作, 但是不能
    对集群进行任何的修改操作(比如 队列添加,交换机添加,增加/移除 新的节点等)
    具体想让rabbitmq实现集群, 我们首先需要改一下系统的hostname (因为rabbitmq集群节点名称是读取
    hostname的)
    这里 我们模拟3个节点 :
    rabbitmq1
    rabbitmq2
    rabbitmq3
    linux修改hostname命令: hostnamectl set-hostname [name]

    修改后重启一下 让rabbitmq重新读取节点名字

    然后 我们需要让每个节点通过hostname能ping通(记得关闭防火墙) 这里 我们可以修改修改一下hosts文件
    关闭防火墙:
    关闭防火墙 systemctl stop firewalld.service 禁止开机自启 systemctl disable firewalld.service
    接下来,我们需要将各个节点的.erlang.cookie文件内容保持一致(文件路径/var/lib/rabbitmq/.erlang.cookie)
    因为我是采用虚拟机的方式来模拟集群环境, 所以如果像我一样是克隆的虚拟机的话 同步.erlang.cookie文件这个
    操作在克隆的时候就已经完成了。
    上面这些步骤完成之后 我们就可以开始来构建集群 了
    我们先让rabbitmq2 加入 rabbitmq1与他构建为一个集群
    执行命令( ram:使rabbitmq2成为一个内存节点 默认为:disk 磁盘节点):
    rabbitmqctl stop_app rabbitmqctl join_cluster rabbit@rabbitmq1 --ram rabbitmqctl start_app
    在构建的时候 我们需要先停掉rabbitmqctl服务才能构建 等构建完毕之后再启动
    我们吧rabbitmq2添加完之后在rabbitmq3节点上也执行同样的代码 使他也加入进去 当然 我们也可以让
    rabbitmq3也作为一个磁盘节点
    当执行完操作以后我们来看看效果:


    image.png

    随便在哪个节点打开管理页面都能看到集群环境各节点的信息;
    有关集群的其他命令:
    rabbitmq-server -detached 启动RabbitMQ节点 rabbitmqctl start_app 启动RabbitMQ应用,而不是节点
    rabbitmqctl stop_app 停止 rabbitmqctl status 查看状态 rabbitmqctl add_user mq 123456 rabbitmqctl
    set_user_tags mq administrator 新增账户 rabbitmq-plugins enable rabbitmq_management 启用
    RabbitMQ_Management rabbitmqctl cluster_status 集群状态 rabbitmqctl forget_cluster_node
    rabbit@[nodeName] 节点摘除 rabbitmqctl reset application 重置
    普通模式的rabbitmq集群搭建好后, 我们来说一下镜像模式
    在普通模式下的rabbitmq集群 他会吧所有节点的交换机信息 和队列的元数据(队列数据分为两种 一种为队列里面
    的消息, 另外一种是队列本身的信息 比如队列的最大容量,队列的名称,等等配置信息, 后者称之为元数据) 进
    行复制 确保所有节点都有一份。
    而镜像模式,则是吧所有的队列数据完全同步(当然 对性能肯定会有一定影响) 当对数据可靠性要求高时 可以使
    用镜像模式
    实现镜像模式也非常简单 有2种方式 一种是直接在管理台控制, 另外一种是在声明队列的时候控制
    声明队列的时候可以加入镜像队列参数 在上方的参数列表当中有解释 我们来讲一下管理台控制
    镜像队列配置命令解释:
    rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
    -p Vhost: 可选参数,针对指定vhost下的queue进行设置 Name: policy的名称 Pattern: queue的匹配模式(正
    则表达式) Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode ha-mode:指明镜像队
    列的模式,有效值为 all/exactly/nodes all:表示在集群中所有的节点上进行镜像 exactly:表示在指定个数的
    节点上进行镜像,节点的个数由ha-params指定 nodes:表示在指定的节点上进行镜像,节点名称通过ha-
    params指定 ha-params:ha-mode模式需要用到的参数 ha-sync-mode:进行队列中消息的同步方式,有效
    值为automatic和manual
    这里举个例子 如果想配置所有名字开头为 policy的队列进行镜像 镜像数量为1那么命令如下:
    rabbitmqctl set_policy ha_policy "^policy_" '{"ha-mode":"exactly","ha-params":1,"ha-sync-
    mode":"automatic"}'

    怎么保证消息不丢失

    消息发布方:
    1.创建队列时,设置队列的持久化
    2.发送消息是设置消息的模式deliveryMode=2,持久化
    3.利用发送方ack机制,发送前在redis中维护id-发送状态[0-未发送,1-已发送,2-发送成功,3-发送失败],时间戳
    2-发送成功的移除集合,对3-发送失败和1-已发送但超过规定时间未ack的消息,开启一个新的线程或任务定期扫描并发送。
    消息接收方:
    1.开启手动确认机制。
    2.消费消息后还未手动确认,系统挂掉了(这种情况不用考虑,消息会从unacked状态重新入队改为redy状态)
    3.消息消费时出现业务异常的情况,手动ack消息失败,放入原队列或者重定向到死信队列。

    怎么保证消息的顺序性

    1.要实现严格的顺序消息,简单且可行的办法就是:保证生产者 - MQServer - 消费者是一对一对一的关系
    2.设定相关的路由键,把强相关的数据分配到同一个队列,一个队列对应一个消费者。
    面试题解答:

    1.rabbitmq
    应用场景:
    质控分组
    
    怎么保证消息不丢失:
    1.搭建高可用集群,设置多个磁盘节点保证元数据不丢失
    2.创建持久化队列,创建镜像队列
    3.发送消息时指定消息是持久化消息
    4.利用消息发送端的消息确认回调机制,比如在redis中维护消息id与发送状态的记录,ack确认发送成功的清除掉,发送失败的,redis中更新失败状态
    5.消费端开启手动确认,业务异常情况 存入数据库,存库失败,放入死信队列。
    
    怎么保证消息不重复消费?
    消息幂等性问题:消费消息时判断该消息是否已经处理过,处理过就不在处理。(比如你存库已经存了那这个时候 再过来这个消息 要么就不处理)
    
    怎么保证消息的有序性?
     一个消息队列只开启一个消费端:或者通过路由间把强关联的消息发送到同一个队列,然后每个消费端跟队列一一对应。
     
     怎么解决消息堆积问题?
     大量消息堆积:
     造成消息堆积有两种原因:
     1.很大原因是消费端挂了或者消费端的处理能力比较差?
      临时借调10倍的机器,部署消费端进行消费
     2.消费端bug问题
    ,如果是消费端bug问题,那就写一个临时的消费消息的应用 把消息释放掉,等bug修复之后,重新再服务端生成消息。
    

    相关文章

      网友评论

          本文标题:4.RabbitMq消息队列

          本文链接:https://www.haomeiwen.com/subject/nehamktx.html