美文网首页
RabbitMQ学习使用

RabbitMQ学习使用

作者: 那就省略号吧 | 来源:发表于2019-10-23 10:29 被阅读0次

    消息队列(MQ)

    • 优点:

    1.解耦:通过 MQ,Pub/Sub 发布订阅消息这么一个模型,将系统A就跟其它系统B\C\D解耦了。
    2.异步:通过MQ,A调用B,C,D各需要200ms,如果调用为串行执行,则需要600ms,如果不是时效性非常强的可以通过MQ来进行异步调用,提高响应速度;
    3.削峰:当某个时间访问量持续飙升,过了这个时间段后又趋于平缓,可以通过mq来进行消息积压慢慢消费,降低对系统的压力;

    • 缺点:

    1.系统可靠性降低:引用外部依赖越多,服务挂机的可能性就越大
    2.系统复杂度提高:消息重复消费问题,消息丢失问题
    3.数据一致性问题:数据是否消费成功

    各消息队列应用对比

    图片.png

    RabbitMQ定义

    RabbitMQ:是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的(摘抄百度)。

    RabbitMQ的常用组件

    borker:rabbitMQ服务器;
    virtual host:虚拟机,一个rabbitMQ服务器可以创建多个虚拟机,每个虚拟机相当于一个服务器,是相互独立的;
    exchange:交换机,用来将生产者生产的消息发送到消息队列,交换机主要分为三类:fanout、direct、topic、headers,已及自定义的交换机;
    queue:消息队列,作为一个容器用来存储生产者推送给消费者的消息,直至被消费者消费;
    routing_key:路由键,生产者需要将消息发送到指定的队列,需要通过指定路由键来完成,是在发送消息的时候要用到的路由键;
    binding_key:路由键,在绑定的时候使用的到路由键,用于消息队列的绑定;
    message:消息主体,为生产者消费者;
    publisher:消息生产者;
    consumer:消息消费者;
    connect:链接,一个链接可以建立多个信道;
    channel:信道,消费者与rabbitMQ服务器之间进行消息交互的通道,由于重复进行TCP的创建和销毁开销太大,并发数会受到系统的限制,容易产生性能瓶颈,信道是在真实TCP链接的基础上建立的虚拟链接,连接数量并不会受到限制,且开销小;

    rabbitMQ

    Exchange类型

    消息路由是从消息生产者生成消息,通过信道传递到交换机,此时消息有一个路由键,路由键会将消息匹配到对应的队列中,消息消费者再从队列中获取消息,在交换机到队列之间的消息路由由于交换机类型的不同会产生不同的路由模式:
    1、fanout:广播形式,消息传递到交换机上,会以广播的形式绑定到所有该交换机绑定的队列中;
    2、direct:消息在从交换机投递到队列是,消息所带的路由键(routing_key)必须与队列所绑定的路由键(binding_key)一致
    3、topic:以一定规则将消息所带的路由键与队列绑定的路由键进行匹配,规则如下:1、以‘.’对路由键的命名根据单词来进行分割,比如:com.rabbitMQ.queue;2、用‘#’或者‘*’来进行模糊匹配,‘*’用于匹配一个单词,‘#’用与匹配多个单词;

    topic消息路由表
    4、header:通过消息头部信息(键值对形式)与交换机跟队列构成的键值对匹配,比较少用;
    (排他性exclusive:排他是基于链接可见,同一个链接下的不同信道,可以同时访问统一链接创建的排他队列,但其他链接不能对起进行访问)

    消息的传送和消费

    消息传送

    1、生产者建立一个tcp链接,并开启一个信道,用于消息的传送;
    2、生产者定义一个交换机,队列,并设置相关属性:如持久化,交换机类型,是否排他等,通过路由键将交换机和队列进行绑定;
    3、生产者发送消息,消息种包含要传送的交换机,路由键信息,通过信道将信息传递到交换机中并通过路由键匹配投递到相应对列中,当消息未投递成功会根据相应的配置的属性值将消息丢弃或回退;
    4、关闭信道,关闭链接;

    消息消费

    1、消费者建立一个tcp链接,并开启一个信道,用于消息的传送;
    2、消费者想rabbitMQ Broder请求相应队列的消息,可设置响应的回调函数;
    3、消费者等到消息投递,消息投递主要有两种模式:1)推模式:通过持续订阅的方式来获取消息;2)拉模式:用于获取单条消息,消费者确认接收并确认消费(ack),消息从响应对列中删除;
    4、关闭信道,关闭链接;

    连接RabbitMQ

      public  Channel  createConnectFactory() throws IOException, TimeoutException {
            Connection connection =null;
            Channel channel =null;
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //当连接失败时,尝试重新连接
            connectionFactory.setAutomaticRecoveryEnabled(true);
            //重试间隔时间(单位:毫秒)
            connectionFactory.setNetworkRecoveryInterval(10000);
            //方法一:
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setHost("localhost");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            try {
                connection = connectionFactory.newConnection();
            }catch(java.net.ConnectException e){
                log.error("连接异常"+e.getMessage());
            }
            //方法二:可以设置多个连接地址,无法连接也不会抛出异常,接着尝试下个连接
            Address [] addresses=new Address[]{new Address("127.0.0.1",5672),new Address("192.168.1.0",5672)};
            connection =connectionFactory.newConnection("amqp://guest:guest@localhust:5672/");
            connectionFactory.newConnection();
            channel = connection.createChannel();
            return channel;
        }
    

    交换机和队列的使用

    public  void initRabbit(){
        Channel channel =null;
        try {
            //创建连接
            channel=createConnectFactory();
            String exchange="exchange";
            String queue="queue";
            String routingKey="routingKey";
            /*声明交换机参数:exchange(交换机名), type(交换机类型), durable(是否持久化)*/
            channel.exchangeDeclare(exchange,"direct",true);
            channel.exchangeDeclare("exchange1","fanout",true);
            /*声明队列参数:queue(队列名), durable(是否持久化), exclusive(是否排他), autoDelete(消息传递后是否自动删除),arguments(其他结构化的参数)*/
            channel.queueDeclare(queue,true,true,false,null);
            /*交换机也可以用于绑定交换机,被绑定的交换机相当于一个队列,对应参数:destination(被绑定的交换), source(携带消息的交换机),routingKey(路由键)*/
            channel.exchangeBind("exchange1",exchange,"exKey");
            /*队列绑定参数:queue(队列名),exchange(交换机名),routingKey(路由键), arguments(其他结构化的参数)*/
            channel.queueBind(queue,exchange,routingKey,null);
            /*内部被绑定的交换机与队列绑定*/
            channel.queueBind("queue1","exchange1","",null);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
    

    针对持久化问题可以通过设置'durable'属性来对交换机和队列进行持久化

    交换机与交换机绑定

    消息发送

    //方法1:
    /*发送消息参数:exchange(交换机名),routingKey(路由键),mandatory(设置消息未被成功路由是否将消息返回给生产者),props(消息设置的参数,也可自定义), body(消息体)*/
    channel.basicPublish(exchange,routingKey,MessageProperties.PERSISTENT_BASIC,message.getBytes());
    //方法2:
    /*自定义消息属性*/
    channel.basicPublish(exchange,routingKey,new AMQP.BasicProperties().builder()
            .contentType("text/plain")
            //消息持久化
            .deliveryMode(2)
            .priority(1)
            .userId("hidden")
            .build(),message.getBytes());
    

    (new AMQP.BasicProperties().builder().deliveryMode(2).build(),对消息持久化)
    1、mandatory:消息发送时设置该属性用于针对消息无法根据自身类型以及路由键找到对应的消费者时将消息回退给生产者(true)中还是丢弃(false);2、immediate:该属性设置true表示当消息根据路由匹配队列上,并未发现任何消费者时,不将消息加入到队列中,而是返回给生产者,尽可能不用,会影响镜像队列的性能

    消息消费

    public void consume(){
            Channel channel=null;
            try {
                //推模式:发布订阅
                channel=createConnectFactory();
                //表示消息在推送给消费者时,一次性不要给消费者推送超过设置的消息数
                channel.basicQos(1);
                Channel channel1=channel;
                String queue="queue";
                //autoAck=false:表示手动确认消息,默认为true
                Boolean autoAck=false;
    //            Channel finalChannel = channel;
                channel.basicConsume(queue,autoAck,new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope,
                                               AMQP.BasicProperties properties,
                                               byte[] body)
                            throws IOException
                    {
                        //消息成功接收后告知队列
                        channel1.basicAck( envelope.getDeliveryTag(),false);
                    }
                });
                //拉模式
                GetResponse getResponse = channel.basicGet(queue, autoAck);
                byte[] body = getResponse.getBody();
                //消息成功接收后告知队列:deliveryTag可以看做是消息编号
                channel.basicAck(getResponse.getEnvelope().getDeliveryTag(),false);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    

    注意:autoAck属性用于表示是否确认处理,设置true表示消息发送后,无需等消费者确认消息接口便从队列移除,设置false表示消息发送后,需要等待消费者确认,确认后从队列中移除,未被确认的消息会一致保存在等断开重新连接后重新加入到消息队列进行投递;当消费者需要拒绝消息时,可以通过:channel.basicReject(long deliveryTag, boolean requeue) throws IOException进行拒绝,表示拒绝消息编号为deliveryTag的消息, requeue为false表示将消息从队列中移除,不会重新发送给消费者

    备份交换机(AE:alternate exchange)

    当存在有消息未被成功路由,不想使用mandatory参数且不想消息丢失的情况下可以设置一个备份交换机用来存储这些消息,当需要对这些信息进行处理时去获取:

    String amexchange="amexchange";
    String amQueue="amQueue";
    String amRoutingKey="amRoutingKey";
    String exchange="exchange";
    String queue="queue";
    String routingKey="routingKey";
    Map<String, Object> arguments = new HashMap<>();
    //声明交换机为备份交换机
    arguments.put("alternate-exchange",amexchange);
    channel.exchangeDeclare(amexchange,"fanout",true,false,arguments);
    channel.queueDeclare(amQueue,true,false,false,null);
    channel.queueBind(amQueue,amexchange,null);
    channel.exchangeDeclare(exchange,"direct",true);
    channel.queueDeclare(queue,true,true,false,null);
    channel.queueBind(queue,exchange,routingKey,null);
    
    备份交换机

    客户端会根据交换机所设置的交换机类型,属性进行展示:


    客户端展示

    死信队列(DLX:dead-letter-exchange)

    1、死信队列的形成:1)、消息被拒绝(channel.basicReject(),channel.basicNack()),并且requeue属性为false;2)、消息过期;3)、队列达到最大长度。
    消息过期:可以对通过在声明队列时,定义'x-message-ttl'值来设置所有消息的过期时间或者通过在'new AMQP.BasicProperties().builder().expiration("6000").build()'来对单条消息设置过期时间
    2、死信队列是也是一个正常的队列,通过设置队列的x-dead-letter-exchange参数将队列变为死信队列,当一个非死信队列中存在死信,则会将这条消息自动发送到死信交换机上并路由到死信队列中;

    死信队列
    死信队列客户端

    延迟队列

    延迟对列存储的消息对象为延迟消息,也就是生产者发送的消息并不想立刻发送给消费者,而是想等待一段时间后再进行发送,延迟队列适用场景有定时,订单规定时间支付,可以引用死信对列来实现,消费者所绑定的为死信对列,对每条消息设置过期时间,当消息过期后转到死信队列中供消费者消费;

    优先级队列

    优先级队列,优先级高的队列具有高的优先权,优先级高的消息具有优先被消费的权力,设置队列优先级可以通过设置arguments属性x-max-priority来设置优先权,消息优先权可以通过设置new AMQP.BasicProperties().builder().priority("10").build()来设置优先权

    如何保证数据传输可靠性

    数据丢失环节
    • 消息生产者传输消息至MQ发生数据丢失

    为保证消息成功从生产者通过信道发送至交换机中,rabbitMQ引入两种机制来保证:

    1、事务机制
    public void business() throws IOException {
        Channel channel =null;
        try {
            //创建连接
            channel=createConnectFactory();
            //开启事务
            channel.txSelect();
            String exchange="exchange";
            String queue="queue";
            String routingKey="routingKey";
            channel.exchangeDeclare(exchange,"direct",true);
            channel.queueDeclare(queue,true,true,false,null);
            channel.queueBind(queue,exchange,routingKey,null);
            String message="hello world";
            channel.basicPublish(exchange,routingKey,MessageProperties.PERSISTENT_BASIC,message.getBytes());
            //事务提交
            channel.txCommit();
        } catch (Exception e) {
            channel.txRollback();
            e.printStackTrace();
        }
    }
    

    事务的缺陷:事务的开启会严重影响RabbitMQ的性能,且消息的发送只能等待前一条事务完成后再发送。

    2、发送方确认机制

    发送方消息确认机制是用于解决生产者知悉消息已经成功到达rabbitMQ,消息的确认一般是等消息成功路由到与其匹配的队列中才进行确认(Basic.Ack),如果消息和队列有实现持久化,只消息确认是在消息成功持久化到磁盘中才进行消息确认。确认机制需要将信道设置为confirm(确认)模式,该模式下消息发送时会被自动设置一个唯一ID(从1开始增长),并且消息在确认时返回的信息中会带上该唯一ID;
    相较于事务机制,发送方确认机制最大的优点是可以异步进行,生产者应用程序可以在等待信道返回确认的同时发送消息,等待消息确认之后,生产者应用程序可以回调方法来处理确认消息,即使消息被拒绝(Basic.nack())。

    public void confirm(){
        Channel channel =null;
        try {
            //创建连接
            channel=createConnectFactory();
            //开启确认(confirm)模式
            channel.confirmSelect();
            String amexchange="amexchange";
            String amQueue="amQueue";
            String amRoutingKey="amRoutingKey";
            String exchange="exchange";
            String queue="queue";
            String routingKey="routingKey";
            Map<String, Object> arguments = new HashMap<>();
            //声明交换机为备份交换机
            arguments.put("alternate-exchange",amexchange);
            channel.exchangeDeclare(amexchange,"fanout",true,false,arguments);
            channel.queueDeclare(amQueue,true,false,false,null);
            channel.queueBind(amQueue,amexchange,"");
            channel.exchangeDeclare(exchange,"direct",true);
            channel.queueDeclare(queue,true,true,false,null);
            channel.queueBind(queue,exchange,routingKey);
            String message="hello world";
            channel.basicPublish(exchange,routingKey,MessageProperties.PERSISTENT_BASIC,message.getBytes());
            //监听消息确认
            if (!channel.waitForConfirms()){
                System.out.println("发送消息失败");
            }
            /*监听被拒绝的消息*/
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    

    发送方消息确认机制有:
    1)普通确认(confirm)方法:其实就是串行执行,等一条消息被确认后再发送下一条,与事务机制类似,且性能类似,并没有实现该机制真正的优点,
    2)批量确认(confirm)方法:没发送一批消息后,就调用 channel.waitForConfirms()方法,等待服务器的确认返回。但是其存在消息如果丢失或者超时,已经被拒绝的情况下,需要重发,导致性能更大的消耗。
    3)异步确认(confirm)方法:提供一个回调方法,当服务器确认一条或者多条消息后悔回调该方法进行消息处理,并可以针对确认信息类型的不同来调用不同的方法,比如超时,被拒绝等等。该方法能极大提升服务器性能;
    注意:confirm模式和事务区别在于一个时异步,一个是同步

    • 消息存储在MQ内存中, MQ发生宕机,导致数据丢失

    开启数据持久化
    第一步:创建queue时将其设置为持久化,持久化得为元数据,并不是queue里的数据
    第二步:发送消息的时候将消息的delveryMode设置为2;
    一般情况下数据队列消息持久化与confirm模式结合,只有当消息持久化至硬盘中时,confirm才通知生产者ack,当为持久化时,生产者可根据需要对消息进行重发;

    • 消息消费者从MQ消费消息时,还未消费完服务宕机

    使用手动ack模式,只有当消息被消费者真正消费时,消费者确认后才通知队列将数据进行移除,而不是自动ack模式,当消息到达消费者便通知队列将数据移除,此时有可能数据还为被真正消费,消费者就宕机了;

    相关文章

      网友评论

          本文标题:RabbitMQ学习使用

          本文链接:https://www.haomeiwen.com/subject/ryddyctx.html