美文网首页
RabbitMQ入门和基本模型

RabbitMQ入门和基本模型

作者: Raral | 来源:发表于2021-10-09 15:18 被阅读0次

    RabbitMQ

    安装

    1. 一般来说安装 RabbitMQ 之前要安装 Erlang ,可以去Erlang官网下载。接着去RabbitMQ官网下载安装包,之后解压缩即可。根据操作系统不同官网提供了相应的安装说明:Windows、Debian / Ubuntu、RPM-based Linux、Mac

    如果是Mac 用户,个人推荐使用 HomeBrew 来安装,安装前要先更新 brew:

    brew update
    brew install rabbitmq
    
    1. 启动
      启动很简单,找到安装后的 RabbitMQ 所在目录下的 sbin 目录,可以看到该目录下有6个以 rabbitmq 开头的可执行文件,直接执行 rabbitmq-server 即可,下面将 RabbitMQ 的安装位置以 . 代替,启动命令就是:
    ./sbin/rabbitmq-server
    
    #如果电脑已安装查看目录
    [root@VM-0-2-centos mq]# find / -name rabbitmq-server
    /etc/logrotate.d/rabbitmq-server
    /usr/sbin/rabbitmq-server
    /usr/lib/rabbitmq/bin/rabbitmq-server
    /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/sbin/rabbitmq-server
    /usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server
    
    #后台启动
    ./sbin/rabbitmq-server -detached
    
    
    1. 浏览器查看 http://ip:15672/ 登录即可 默认账户密码:guest
    2. 设置用户和密码(https://www.cnblogs.com/whs123/p/14184317.html)
    [root@VM-0-2-centos sbin]# ./rabbitmqctl add_user xx admin123
    Adding user "xxx" ...
    [root@VM-0-2-centos sbin]# ./rabbitmqctl set_user_tags xxxadministrator
    Setting tags for user "xxx" to [administrator] ...
    
    

    rabbitmq使用

    六种消费模型:

    1. P - Q - C(基本消息模型)
      P:生产者,也就是要发送消息的程序

    C:消费者:消息的接受者,会一直等待消息到来。
    queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

    • pom
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.7.2</version>
    </dependency>
    
    • 工具类
    public class RabbitMqUtils {
        private static ConnectionFactory connectionFactory;
        static {
            //重量级类常见,在类加载创建一次
             connectionFactory = new ConnectionFactory();
    
        }
    
        // 定义创建连接
       public static Connection getConnection() {
    
    
           //获取连接对象
           try {
               //创建mq连接工厂对象
               connectionFactory.setHost("x.y.z.j");
               connectionFactory.setPort(5672);
               connectionFactory.setVirtualHost("/gzsz");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
               connectionFactory.setUsername("lg123");
               connectionFactory.setPassword("lg123");
               Connection connection = connectionFactory.newConnection();
               return connection;
           } catch (IOException e) {
               e.printStackTrace();
           } catch (TimeoutException e) {
               e.printStackTrace();
           }
           return null;
       }
    
       //关闭通道,关闭连接
        public static void closeChannel(Channel channel, Connection connection) {
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    
        //
    }
    
    
    • 生产者
    public class Provider {
    
        private final static String QUEUE_NAME = "simple_queue";
    
        public void sendMessage() throws IOException, TimeoutException {
            // 1. 获取链接
            Connection connection = RabbitMqUtils.getConnection();
            // 2. 获取通道对象
            Channel channel = connection.createChannel();
            // 3. 通道绑到对应消息队列
            //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            /**
             * 参数明细
             * 1、queue 队列名称
             * 2、durable 是否持久化,如果持久化,mq重启后队列还在
             * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
             * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
             * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
             */
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            // 4.通过通道向指定的队列发布消息
            //参数:String exchange, String routingKey, BasicProperties props, byte[] body
            /**
             * 参数明细:
             * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
             * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
             * 3、props,消息的属性
             * 4、body,消息内容
             */
            String msg = "hello rabbitmq";
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            System.out.println("发送消息:" + msg.getBytes());
            // 5. 关闭通道和链接
            RabbitMqUtils.closeChannel(channel,connection);
        }
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Provider provider = new Provider();
            provider.sendMessage();
        }
    }
    
    • 消费者
    public class Customer {
        private final static String QUEUE_NAME = "simple_queue";
    
        public void acceptMessage() throws IOException, TimeoutException {
            Connection connection = RabbitMqUtils.getConnection();
    
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
    
            //消费消息
            //参数1: 队列名称;参数2:开启消息自动确认机制
            //参数3: 消费完成回调接口
            channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body =" + new String(body));
                }
    
            });
    
    //        channel.close();
    //        connection.close();
    
        }
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Customer customer = new Customer();
            customer.acceptMessage();
        }
    
    }
    

    消息确认机制(ACK)
    通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。

    那么问题来了:RabbitMQ怎么知道消息被接收了呢?

    如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!

    因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

    自动ACK:消息一旦被接收,消费者自动发送ACK

    手动ACK:消息接收后,不会发送ACK,需要手动调用

    大家觉得哪种更好呢?

    这需要看消息的重要性:

    如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便

    如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。

    我们之前的测试都是自动ACK的,如果要手动ACK,需要改动我们的代码:
    消费者:

    public class Customer {
        private final static String QUEUE_NAME = "simple_queue";
    
        public void acceptMessage() throws IOException, TimeoutException {
            Connection connection = RabbitMqUtils.getConnection();
    
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
    
            //消费消息
            //参数1: 队列名称;参数2:开启消息自动确认机制
            //参数3: 消费完成回调接口
            channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者: body =" + new String(body));
                    // 手动进行ACK
                    /*
                     *  void basicAck(long deliveryTag, boolean multiple) throws IOException;
                     *  deliveryTag:用来标识消息的id
                     *  multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
                     */
    
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
    
            });
    
    //        channel.close();
    //        connection.close();
        }
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Customer customer = new Customer();
            customer.acceptMessage();
        }
    
    }
    
    1. p -q- [c1, c2 ] (work消息模型,竞争消息模型)
      P:生产者:任务的发布者

    C1:消费者1:领取任务并且完成任务,假设完成速度较慢(模拟耗时)

    C2:消费者2:领取任务并且完成任务,假设完成速度较快

    • 生产者
    public class Provider {
        private final static String QUEUE_NAME = "work_queue";
    
        public static void main(String[] args) throws IOException {
            Connection connection = RabbitMqUtils.getConnection();
            Channel channel = connection.createChannel();
            //通道声明队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            //发布消息
            for (int i = 1; i <= 10; i++) {
                channel.basicPublish("",QUEUE_NAME, null, ("task..." + i).getBytes());
                System.out.println("生产者-发送消息:"+ "task..." + i);
            }
            RabbitMqUtils.closeChannel(channel,connection);
        }
    }
    
    
    • 消费者1
    public class Customer1 {
        private final static String QUEUE_NAME = "work_queue";
        public void acceptMessage() throws IOException, TimeoutException {
            Connection connection = RabbitMqUtils.getConnection();
    
            Channel channel = connection.createChannel();
    //        channel.basicQos(1);
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
    
            channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1:" + new String(body));
                    //手动确认
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
    
            });
    
    //        channel.close();
    //        connection.close();
    
        }
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Customer1 customer1 = new Customer1();
            customer1.acceptMessage();
            System.out.println("消费者1已启动");
        }
    }
    
    
    • 消费者2
    public class Customer2 {
        private final static String QUEUE_NAME = "work_queue";
        public void acceptMessage() throws IOException, TimeoutException {
            Connection connection = RabbitMqUtils.getConnection();
    
            Channel channel = connection.createChannel();
            // 不平均分配
    //        channel.basicQos(1);
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
    
    
            //参数2:消息自动确认
            channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("消费者2:" + new String(body));
    //                channel.basicAck(envelope.getDeliveryTag(), false);
                }
    
            });
    
    //        channel.close();
    //        connection.close();
    
        }
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Customer2 customer2 = new Customer2();
            customer2.acceptMessage();
            System.out.println("消费者2已启动");
        }
    }
    

    能者多劳
    刚才的实现有问题吗?

    消费者1比消费者2的效率要低,一次任务的耗时较长

    然而两人最终消费的消息数量是一样的

    消费者1大量时间处于空闲状态,消费者2一直忙碌

    现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
    怎么实现呢?

    通过 BasicQos 方法设置prefetchCount = 1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理1个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。相反,它会将其分派给不是仍然忙碌的下一个Consumer。

    值得注意的是:prefetchCount在手动ack的情况下才生效,自动ack不生效。

    public class Customer1 {
        private final static String QUEUE_NAME = "work_queue";
        public void acceptMessage() throws IOException, TimeoutException {
            Connection connection = RabbitMqUtils.getConnection();
    
            Channel channel = connection.createChannel();
            //prefetchCount在手动ack的情况下才生效,自动ack不生效
            channel.basicQos(1);
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
    
            channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1:" + new String(body));
                    //手动确认
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
    
            });
    
    //        channel.close();
    //        connection.close();
    
        }
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Customer1 customer1 = new Customer1();
            customer1.acceptMessage();
            System.out.println("消费者1已启动");
        }
    }
    

    订阅模型分类
    说明下:

    1、一个生产者多个消费者
    2、每个消费者都有一个自己的队列
    3、生产者没有将消息直接发送给队列,而是发送给exchange(交换机、转发器)
    4、每个队列都需要绑定到交换机上
    5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者消费
    例子:注册->发邮件、发短信

    X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

    Exchange类型有以下几种:

    Fanout:广播,将消息交给所有绑定到交换机的队列

    Direct:定向,把消息交给符合指定routing key 的队列

    Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

    Header:header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。

    Header模式不展开了,感兴趣可以参考这篇文章https://blog.csdn.net/zhu_tianwei/article/details/40923131
    Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

    1. P-X-[q1,q2]-[c1,c2] (Publish/subscribe(交换机类型:Fanout,也称为广播 ))
      和前面两种模式不同:

    1) 声明Exchange,不再声明Queue

    2) 发送消息到Exchange,不再发送到Queue

    • 生产者
    public class Provider {
        private final static String EXCHANGE_NAME = "test_fanout_exchange";
    
        public static void main(String[] args) throws IOException {
            Connection connection = RabbitMqUtils.getConnection();
            Channel channel = connection.createChannel();
            // 通道声明到交互机
            //参数1: 交互机名称; 参数2: 交换机类型: fanout
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String msg = "注册成功";
            channel.basicPublish(EXCHANGE_NAME,"", null, msg.getBytes());
    
            //发布消息
    //        for (int i = 1; i <= 10; i++) {
    //            channel.basicPublish("logs","", null, (i + "fanout").getBytes());
    //
    //        }
            System.out.println("生产者:发送消息:" + msg.getBytes());
            RabbitMqUtils.closeChannel(channel,connection);
        }
    }
    
    
    • 消费者1
    public class Customer1 {
        private final static String EXCHANGE_NAME = "test_fanout_exchange";// 交换机
        private final static String QUEUE_NAME = "fanout_exchange_queue_sms";// 发送短信队列
        public void acceptMessage() throws IOException, TimeoutException {
            Connection connection = RabbitMqUtils.getConnection();
            Channel channel = connection.createChannel();
    //        channel.exchangeDeclare("logs","fanout");
    //        String queueName = channel.queueDeclare().getQueue();
            //声明队列名称
            channel.queueDeclare(QUEUE_NAME, false,false,false,null);
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
            channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1【短信服务】:" + new String(body));
                }
            });
    
        }
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Customer1 customer1 = new Customer1();
            customer1.acceptMessage();
        }
    }
    
    
    
    • 消费者2
    public class Customer2 {
        private final static String EXCHANGE_NAME = "test_fanout_exchange";// 交换机
        private final static String QUEUE_NAME = "fanout_exchange_queue_email";// 发送邮件队列
        public void acceptMessage() throws IOException, TimeoutException {
            Connection connection = RabbitMqUtils.getConnection();
            Channel channel = connection.createChannel();
    //        channel.exchangeDeclare("logs","fanout");
    //        String queueName = channel.queueDeclare().getQueue();
            //声明队列名称
            channel.queueDeclare(QUEUE_NAME, false,false,false,null);
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
            channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2【邮件服务】:" + new String(body));
                }
            });
    
        }
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Customer2 customer1 = new Customer2();
            customer1.acceptMessage();
        }
    }
    
    
    

    思考
    1、publish/subscribe与work queues有什么区别。

    区别:

    1)work queues不用定义交换机,而publish/subscribe需要定义交换机。

    2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认交换机)。

    3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实际上work queues会将队列绑定到默认的交换机 。

    相同点:

    所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
    2、实际工作用 publish/subscribe还是work queues。

    建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大(也可以做到同一队列竞争),并且发布订阅模式可以指定自己专用的交换机。

    分享两道面试题
    面试题:

    避免消息堆积?

    1) 采用workqueue,多个消费者监听同一队列。

    2)接收到消息以后,而是通过线程池,异步消费。

    如何避免消息丢失?

    1) 消费者的ACK机制。可以防止消费者丢失消息。

    但是,如果在消费者消费之前,MQ就宕机了,消息就没了?

    2)可以将消息进行持久化。要将消息持久化,前提是:队列、Exchange都持久化

    接下来几种模式参考

    相关文章

      网友评论

          本文标题:RabbitMQ入门和基本模型

          本文链接:https://www.haomeiwen.com/subject/gnpqoltx.html