美文网首页
RabbitMQ读书笔记

RabbitMQ读书笔记

作者: zouhao1985 | 来源:发表于2020-07-19 16:23 被阅读0次

1.RabbitMQ的概述与重要概念

RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。
MQ的主要用途包含如下:

  • 异步处理,比如注册发送邮件和短信
  • 应用解耦,比如电商系统中,订单服务和库存服务解耦
  • 流量削峰,比如秒杀,抢红包活动

重要的概念可以通过如下图进行了解:


Rabbit模型.png
  • Message 由消息头和消息体组成,消息头由一组可选属性组成,消息体是不透明的
  • Publisher 消息的生产者,表示一个向Exchange发布消息的客户端应用程序
  • Exchange 交换器,接收Publisher发送的消息并路由给Queue
  • Binding 绑定,关联Exchange和Queue
  • Queue 消息队列,用于保存消息直到发送给Consumer,Message会一直在队列里直至被消费者取走
  • Connection TCP连接
  • Channel 信道,是建立在真实的TCP连接里的虚拟连接,对于操作系统来说建立和销毁一次TCP是非常昂贵的开销,因为引入信道来复用一条TCP连接
  • Consumer 消费者,表示从一个Queue中取得消息的一个客户端应用程序
  • Broker RabbitMQ服务器实体
  • Virtual Host 虚拟主机,表示一个mini版的RabbitMQ服务器,拥有自己的Queue、Exchange、Binding和权限机制,默认的vhost是/,必须在连接时指定

MQ对比

MQ对比.png

2.RabbitMQ Window安装介绍

RabbitMQ安装依赖Erlang,因此安装之前需要先安装Erlang环境,如下:

otp_win64_22.3.exe Erlang的Window安装包,安装包中没有Erlang关键字
rabbitmq-server-3.8.3.exe

Erlang和RabbitMQ有对应的版本关系,请点击查看官网信息

Erlang和RabbitMQ版本对应关系部分截图.png

3.插件安装

软件安装完成之后,需要安装管理界面插件。打开RabbitMQ Command Prompt命令行界面,输入如下命令:

rabbitmq-plugins enable rabbitmq_management

安装完成之后,打开网址:http://localhost:15672/
默认账号为guest,密码为guest,进入系统之后创建admin账号,并修改guest密码

4.RabbitMQ 运维篇

4.1 单机模式(开发测试环境推荐)

单机模式参考Window安装即可(暂不提供Linux版本)。

4.2 普通集群模式

组成集群需要两步操作,该操作同样适用于镜像模式

  1. 该模式下需要保证不同机器之间的erlang cooike一致,可将其中一台机器的erlang cookie拷贝到其他机器上。
  2. 将节点加入集群,假如有三个节点,可在节点2,3两台机器上加入到节点1,如下:
rabbitmqctl stop_app 
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@node1
rabbitmqctl start_app

查看集群状态 rabbitmqctl cluster_status

普通集群模式中每个节点都有相同的元数据,即相同的队列结构。但是消息(实际数据)只存在其中一个节点上,因此若消费者连接到非数据节点的时候,消息会先传递给消费者连接的节点,再提供给消费者。因此该模式下由两个重要的特点,1. 若存储消息节点宕机了,整个集群不可用,因此此模式并非高可用;2. 节点之间可能存在大量的数据传递,占用带宽高。即使如此,若使用此种模式,客户端应尽快均匀散布到各个节点上。

原理图如下:


RabbitMQ普通集群.png
4.3 镜像集群模式(生产环境必须)

在创建普通集群的基础上,设置策略(policy),该操作可通过web ui设置,如下:


policy设置.png

也可以通过命令设置

// 为每个以“rock.wechat”开头的队列设置所有节点的镜像,并且设置为自动同步模式
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

该模式是一个HA方案(高可用方案),RabbitMQ是没有中心的,不会因为一个节点挂了导致整个集群不可用,解决了普通模式中的问题,与普通模式不同的是,消息会主动在镜像节点之间同步,而不是在客户端获取数据时再拉取数据。
因数据在不同节点之间主动同步,因此带宽要求更高,降低了系统的性能。这种模式适合对消息可靠性要求较高的场合中使用。
原理图如下:


RabbitMQ镜像集群.png

5.RabbitMQ 实战篇

5.1 管理规范
5.1.1 命名规范

exchange:以ex开头,规则为ex.业务域.应用名称.消息类型

ex.businame.appname.msgtype

queue:以q开头,规则为q.业务域.应用名称.消息类型

q.businame.appname.msgtype
5.1.2 用户管理规范
  • 提供给应用使用的用户类型为none
  • 只授权用户特定的exchange(写)和queue(读)访问权限,这样代码就无法创建交换器和队列
5.1.3 其他规范
  • 队列和交换器由MQ管理员与研发人员沟通规则后,统一由MQ管理员进行创建。
  • 代码中禁止进行创建交换器和队列的操作(若用户管理规范,此操作无法执行)。
5.2 环境准备
  1. maven依赖配置
<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.0.0</version>
</dependency>
  1. 新建一个标准的maven结构工程,编写MQ工具类
public class ConnectionUtil {

    public static Connection get(String username, String pwd) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername(username);
        factory.setPassword(pwd);
        try {
            return factory.newConnection();
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

}
5.3 应用实例

abbitMQ常用的Exchange Type有三种

  1. direct 当消息的Routing key与Binding key完全匹配时,将消息路由到Queue中
  2. fanout 将消息广播到与Exchange绑定的所有Queue,效率最高
  3. topic Binding key使用模式,“#”匹配一个或多个词,“*”只匹配一个词,当消息的Routing key模糊匹配该模式才进行路由
5.3.1 Direct模式
public class DirectSend {

    public static final String EXCHANGE_NAME = "exchange-test-direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.get("producer-a", "producer-a");
        Channel channel = connection.createChannel();
//        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String msg = "哈哈123";
        channel.basicPublish(EXCHANGE_NAME, "delete", null, msg.getBytes("utf-8"));
        System.out.println("[X] send: " + msg);

        channel.close();
        connection.close();
    }

}
public class DirectRec2 {

    //    public static final String EXCHANGE_NAME = "exchange-test-direct";
    public static final String QUEUE_NAME = "queue-test-direct-2";

    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
        final Channel channel = connection.createChannel();
//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
//        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
//        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        channel.basicQos(1);  // 同一时刻服务器只会发送一条消息给消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[Y2] receive msg: " + msg);
                //休眠
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 将消费者绑定到队列,并设置自动确认消息(即无需显示确认,如何设置请慎重考虑)
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

}
5.3.2 Fanout模式
public class SubscribeSend {

    public static final String EXCHANGE_NAME = "exchange-test-fanout-01";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.get("producer-a", "producer-a");
        Channel channel = connection.createChannel();
//        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String msg = "hello world";
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes("utf-8"));
        System.out.println("[X] send: " + msg);

        channel.close();
        connection.close();
    }

}
public class SubscribeRec2 {
    public static final String QUEUE_NAME = "queue-test-fanout-02";

    //    public static final String EXCHANGE_NAME = "exchange-test-01";
    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
        final Channel channel = connection.createChannel();
//        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[Y2] receive msg: " + msg);
                //休眠
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 将消费者绑定到队列,并设置自动确认消息(即无需显示确认,如何设置请慎重考虑)
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

}
5.3.3 Topic模式
public class TopicSend {

    private final static String EXCHANGE_NAME = "exchange-test-topic";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.get("producer-a", "producer-a");
        Channel channel = connection.createChannel();

        // 声明exchange
//        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 消息内容
        String message = "Hello World!!";
        channel.basicPublish(EXCHANGE_NAME, "routekey.1", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }

}
public class TopRec1 {

    public static final String QUEUE_NAME = "queue-test-topic-01";

//    public static final String EXCHANGE_NAME = "exchange-test-topic";

    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
        final Channel channel = connection.createChannel();
//        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[Y1] receive msg: " + msg);
                //休眠
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 将消费者绑定到队列,并设置自动确认消息(即无需显示确认,如何设置请慎重考虑)
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

}
5.3.4 ACK消息确认(推荐使用方式)
public class MultMqSend {

    private final static String EX_NAME = "exchange-test-ack-01";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.get("producer-a", "producer-a");
        Channel channel = connection.createChannel();
//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String msg = "hello";
        for (int i = 0; i < 100; i++) {
            channel.basicPublish(EX_NAME, "rk", null, (msg + i).getBytes("UTF-8"));
            System.out.println("[X] send " + (msg + i));
        }
        channel.close();
        connection.close();
    }

}
public class MultiMqRecManualConfirm1 {

    private final static String QUEUE_NAME = "queue-test-ack-01";

    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
        final Channel channel = connection.createChannel();
//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[Y1] receive msg: " + msg);
                //休眠
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 将消费者绑定到队列,并设置自动确认消息(即无需显示确认,如何设置请慎重考虑)
        channel.basicConsume(QUEUE_NAME, false, consumer);

    }


}
public class MultiMqRecManualConfirm2 {

    private final static String QUEUE_NAME = "queue-test-ack-01";

    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
        final Channel channel = connection.createChannel();
//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[Y1] receive msg: " + msg);
                //休眠
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 将消费者绑定到队列,并设置自动确认消息(即无需显示确认,如何设置请慎重考虑)
        channel.basicConsume(QUEUE_NAME, false, consumer);

    }


}

confirm模式解决了公平轮训的问题,哪个消费者处理更快,处理的消息更多(能者多劳)。这个案例解决了消费者消费消息可靠性问题,但是没有解决发送者发送消息可靠性问题。

5.3.5 basicQo和basicAck关系

两者是配套使用的。

// channel.basicQos(1)指该消费者在接收到队列里的消息但没有返回确认结果之前,
// 队列不会将新的消息分发给该消费者。队列中没有被消费的消息不会被删除,还是存在于队列中。
channel.basicQos(1);  
// 确认消息
channel.basicAck(envelope.getDeliveryTag(), false);

相关文章

网友评论

      本文标题:RabbitMQ读书笔记

      本文链接:https://www.haomeiwen.com/subject/xlmxkktx.html