美文网首页
RabbitMQ--初步了解

RabbitMQ--初步了解

作者: snoweek | 来源:发表于2017-08-13 15:24 被阅读53次

    RabbitMQ是什么?

    rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输,即接收和发送消息。

    RabbitMQ工作原理


    这个系统架构图版权属于sunjun041640。
    下面来介绍RabbitMQ里的一些基本定义,主要如下:
    RabbitMQ Server:提供消息一条从Producer到Consumer的处理。
    Exchange:一边从发布者方接收消息,一边把消息推送到队列。
    producer只能将消息发送给exchange。而exchange负责将消息发送到queues。Procuder Publish的Message进入了exchange,exchange会根据routingKey处理接收到的消息,判断消息是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的主要的type有direct,topic,headers,fanout。具体针对不同的场景使用不同的type。
    queue也是通过这个routing keys来做的绑定。交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而确定消息该分发到哪个队列。
    Queue:消息队列。接收来自exchange的消息,然后再由consumer取出。exchange和queue可以一对一,也可以一对多,它们的关系通过routingKey来绑定。
    Producer:Client A & B,生产者,消息的来源,消息必须发送给exchange。而不是直接给queue
    Consumer:Client 1,2,3消费者,直接从queue中获取消息进行消费,而不是从exchange中获取消息进行消费。
    还有一些在上图中没有显示,但在应用程序中会用到的定义。
    Connection: 就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。
    Channels: **虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。
    Bindings:绑定(binding)是指交换机(exchange)和队列(queue)进行关联。可以简单理解为:这个队列(queue)对这个交换机(exchange)的消息感兴趣。

    Hello,World

    下面将从最简单的应用开始简单了解一下RabbitMQ的使用。
    生产者

    import com.rabbitmq.client.*;
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    public class Send {
        private final static String QUEUE_NAME = "hello";
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            //声明一个TCP连接
            Connection connection = factory.newConnection();
            //声明一个channel
            Channel channel = connection.createChannel();
            BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
            boolean flag = true;
            while(flag){
                System.out.print("往交换机中推送消息:");
                String message = input.readLine();
                if (message == "quit"){
                    flag = false;
                    continue;
                }
                //往exchange中推送消息
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
            channel.close();
            connection.close();
        }
    
    }
    

    消费者

    import com.rabbitmq.client.*;
    import java.io.IOException;
    public class Recv {
        private final static String QUEUE_NAME = "hello";
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages.");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
                }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    

    首先列举一下几个重要声明的参数

    1. 队列声明queueDeclare
    queueDeclare(String queue, boolean durable, boolean exclusive, boolean
     autoDelete, Map<String, Object> arguments)
    
    1. 生产者发送消息basicPublish
    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
    
    1. 消费者消费basicConsume
      消费者到队列中获取消息进行消费。autoAck=true表示消费者收到这个消息就会向队列发送确认ack,表示自己收到了,队列接到反馈后,就会把这条消息从队列中删除,callback表示消费者收到消息后,对消息进行的处理。
    basicConsume(String queue, boolean autoAck, Consumer callback)
    

    上述两段代码,有一下几点要说明。
    1、从Send.java 代码中可以看出并没有声明架构图中的exchange,而是方法basicPublish中使用了空字符串 (""),这代码使用了默认/匿名交换机。
    2.默认交换机有一个特殊的地方,那就是生产者和消费者会公用一个队列。 在Send.java的basicPublish方法中,routingKey参数的位置我们给了一个队列名,默认交换机会把推给他的消息直接再推到相应名称的队列中。在消费者中,声明了队列后,也不需要和交换机进行绑定。消费者会直接到相应队列获取消息。所以我们的设计看起来如下所示(注意只是看起来):

    运行生产者和消费者,运行结果如下:



    第一个消费者



    第二个消费者

    从运行结果中可以看出,RabbitMQ 会顺序循环的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robin。

    消息确认

    这里就会有一个问题,在上述消费者代码中,我们设置autoAck=true,即消费者收到这条消息就通知队列将其删除。然后才会去处理这条消息。
    如果consumer处理消息的时候突然挂了,消息还没有被完成,那么这条消息就会消失。为了演示信息丢失,将消费者代码改成如下

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * Created by sunyan on 17/8/1.
     */
    public class Recv {
        private final static String QUEUE_NAME = "hello";
    
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages.");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    try {
                        Thread.sleep(10000);
                        System.out.println(" [x] Received '" + message + "'");
                    } catch (InterruptedException _ignored) {
                        Thread.currentThread().interrupt();
                    }finally {
                        System.out.println(" [x] Done");
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }
    
    

    在消息输出前,sleep5秒,下面,运行两个消费者,并在发送第一条消息后,将第一个消费者关闭,


    从运行结果可以看出,一共丢失了两条消息。为了不让消息丢失,RabbitMQ
    提供了消息确认机制,consumer在接收到,执行完消息后会发送一个ack给RabbitMQ告诉它可以从queue中移除消息了。如果没收到ack。Rabbitmq会重新发送此条消息,如果有其他的consumer在线,将会接收并消费这条消息。消息确认机制是默认打开的。如果想关闭只需要按如下设置 即可。

    channel.basicConsume(QUEUE_NAME, false, consumer);
    


    从上图可以看出,第一个消费者退出后,原本轮询分发给他的数据又给了第二个消费者,并没有信息丢失。
    如果处理完成后忘记发送ack,那么即使已经被处理的信息,RabbitMQ不会将它删除,且在消费者退出后,将消息继续发送给其他消费者进行重复处理。
    把上段代码中的相应内容注释掉。

    channel.basicAck(envelope.getDeliveryTag(), false);
    


    从上面三张图可以看出,已经处理过的消息h1,h2又被处理了一遍。忘记basicAck是一个常见的错误,但后果是严重的。因为RabbitMQ无法删除任何消息,将会消耗越来越多的内存。

    公平派遣

    看到这里,你可能已经觉得轮询分发有点问题了,如果一个消费者处理一个问题要10s,另一个处理只需瞬间,那么一个消费者将不断忙碌,另一个消费者几乎不会做任何工作。但RabbitMQ不知道什么,还会平均分配消息。这是因为当消息进入队列时,RabbitMQ只会分派消息。它不看消费者的未确认消息的数量。它只是盲目地向第n个消费者发送每个第n个消息。我们可以使用basicQos方法与 prefetchCount = 1设置。这告诉RabbitMQ在消费者返回一个处理消息的ack之前不要再给他发送新消息。相反,它将发送到下一个还不忙的消费者。
    一个消费者设置

     Thread.sleep(10000);
    

    另一个消费者不设置,让其立即处理消息。运行结果如下


    从图中可以看出,原本应该有Recv处理的消息“hello3”,由Recv1处理了。

    消息持久化

    消息确认中讲了如何确保即使消费者死亡,消息也不会丢失。但是如果RabbitMQ服务器停止,消息仍然会丢失。
    当RabbitMQ退出或崩溃时,它会忘记队列和消息,需要两件事来确保消息不会丢失:我们需要将队列和消息标记为耐用。
    首先,我们需要确保RabbitMQ不会失去我们的队列。为了这样做,我们需要将其声明为持久的:

    channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
    

    虽然这个命令本身是正确的,但是在我们目前的设置中是不行的。这是因为我们已经定义了一个 不耐用的名为hello的队列。RabbitMQ不允许您重新定义具有不同参数的现有队列,并会向尝试执行此操作的任何程序返回错误。但是有一个快速的解决方法 - 用不同的名称声明一个队列。

    channel.queueDeclare("hello_world",durable,false,false,null);
    

    消息持久化
    需要通过将MessageProperties(实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN来标记我们的消息。
    总结一下,为了数据不丢失,我们采用了:

    1. 在数据处理结束后发送ack,这样RabbitMQ Server会认为Message Deliver 成功。
    2. 持久化queue,可以防止RabbitMQ Server 重启或者crash引起的数据丢失。
    3. 持久化Message,理由同上。

    总结

    在本篇中,主要介绍了一下默认交换机,在默认交换机下,一条消息只能分发给一个消费者。那如果有很多不同的消费者都对这条消息的话,默认交换机就无法实现了?那么具体该怎么实现,将在RabbitMQ--交换机中进行说明。

    相关文章

      网友评论

          本文标题:RabbitMQ--初步了解

          本文链接:https://www.haomeiwen.com/subject/alajrxtx.html