美文网首页
RabbitMQ操作指南

RabbitMQ操作指南

作者: s1991721 | 来源:发表于2021-04-09 14:20 被阅读0次

    安装

    服务器安装CentOS7.3,在此服务器上安装RabbitMQ

    这里采用rpm的安装方式,准备的安装包如下

    RabbitMQ是Erlang语言开发的因此安装顺序为:erlang->socat->rabbmitmq

     rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
    
     rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm
    

    安装socat时出现异常

    原因是与系统本地库有冲突

    rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm --force --nodeps 采取强制安装
    

    最后安装rabbitmq

    rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
    

    安装成功后会有命令提示

    修改配置信息

    放开guest用户

    开启控制台插件,易于控制台操作

    rabbitmq-plugins enable rabbitmq_management
    

    启动RabbitMQ

    rabbitmq-server start &
    

    浏览器输入IP:15672,但是毫无反馈

    这时还要修改下firewall

    firewall-cmd --zone=public --add-port=15672/tcp --permanent 
    firewall-cmd --zone=public --add-port=5672/tcp --permanent 
    
    systemctl restart firewalld.service
    

    CentOS7数据服务器的搭建文内有firewall参数介绍

    成功访问

    关闭RabbitMQ

    rabbitmqctl stop
    

    开机自启动

    chkconfig rabbitmq-server on
    

    五种交换机,六种分发模式

    官网解释

    RabbitMQ各部件间的关系
    • P是生产者负责产生消息
    • X是交换机负责投递消息
    • 红色部分为队列缓冲,用于存储传过来的消息
    • C为消费者,从缓冲队列中取出消息进行消费

    P产生消息只需知道要投递的X即可,后续过程不关心
    C从指定的队列中取消息消费也无需关心消息的产生过程
    X负责RabbitMQ内部消息的流转,重任都在X上

    交换机

    1、Direct交换机

    消息过来时指定了routingkey,因此只有完全相同routingkey的队列才能够收到消息

    2、Topic交换机

    消息过来时指定了routingkey其组成由“.”分隔,通过通配符匹配到对应routingkey的队列,类似于SQL中的模糊匹配

    通配符有:

    • ‘*’代表一个单词
    • ‘#’代表零个或多个单词

    3、Fanout交换机

    将收到的所有消息,发送到所有绑定的队列中(广播)

    4、Headers交换机

    消息传递过程中不采用routingkey,采用Properties取代。

    在P端channel发送消息时设置Properties的键值对属性,在C端绑定队列时设置对应的Properties属性

    P:
    channel.basicPublish(EXCHANGE_NAME,routingkey , properties, message);
    
    C:
    channel.queueBind(queueName, EXCHANGE_NAME, routingkey, properties);
    
    注:P端与C端properties类型不同,routingkey为“”
    

    5、System Default交换机

    系统默认存在的交换机,规则同Direct交换机

    分发模式

    上述5种交换机,常用的为前三种,根据场景的需求不同共有如下几种分发模式

    代码通用模板,下述只列出差异代码块
    
    ConnectionFactory factory = new ConnectionFactory();  
    // 设置链接地址
    factory.setHost("127.0.0.1");  
    // 创建连接  
    Connection connection = factory.newConnection();  
    // 创建频道  
    Channel channel = connection.createChannel();  
    
    P端:
    
    // 指定队列  
    channel.queueDeclare(QUEUE_NAME, Durable, false, false, null);  
    // 发送的消息  
    String message = "hello world!";  
    // 发送消息  
    channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, Properties, message.getBytes());  
    // 关闭频道和连接  
    channel.close();  
    connection.close();  
    
    C端:
    
    // 声明队列,防止消息接收者先运行,队列不存在。  
    channel.queueDeclare(QUEUE_NAME, Durable, false, false, null);  
    // 创建队列消费者  
    QueueingConsumer consumer = new QueueingConsumer(channel);  
    // 指定消费队列  AutoACK表示是否自动签收
    channel.basicConsume(QUEUE_NAME, AutoACK, consumer);  
    while (true) {  
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
        String message = new String(delivery.getBody());  
    }
    

    1、简单分发模式

    一个P一个C,采用Direct交换机,无特殊要求可默认使用System Default交换机,指明routingkey即可送达

    P:
    channel.basicPublish("", QUEUE_NAME, null, message); 
    
    C:
    channel.basicConsume(QUEUE_NAME, true, QueueingConsumer); 
    

    2、工作队列模式

    一个P多个C,采用Direct交换机(默认即可)将消息存入队列,多个C监听队列消息

    消息丢失

    由于存在多个消费者,若一个消费者在处理过程中死掉,则处理的消息便会被丢失,因此不能够自动签收

    负载均衡

    场景:奇数任务繁重,偶数任务简单;在存在2个C的情况下,就会累死一个C,所以需要保证C在完成任务后再接收新消息

    P:
    channel.basicPublish("", QUEUE_NAME, null, message);
    
    C:
    channel.basicQos(1);// 负载均衡,每次只处理一条消息
    channel.basicConsume(QUEUE_NAME, false, QueueingConsumer); 
    

    3、广播模式

    采用Fanout交换机,将消息复制发送到每一个队列中

    创建交换机时选择类型
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    

    4、路由模式

    前几种模式仅声明了交换机和队列的关系,绑定还可以有额外的routingkey参数,采用Direct交换机。

    消息到达X后,根据routingkey进行复制分发

    声明 routingkey
    channel.queueBind(queueName, EXCHANGE_NAME,routingkey);
    

    5、主题模式

    与路由模式类似,区别在于routingkey。主题模式下routingkey是由‘.’构成的字符串。

    由于Topic交换机的特性,将消息复制发送到不同的队列中

    P:
    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
    C:
    channel.queueBind(queueName, EXCHANGE_NAME, routingkey);
    

    6、RPC模式

    RPC模式在RabbitMQ中没有明确的P与C,因为双方都在发送和接收消息

    重点:

    • C端与S端都需要绑定队列以获取对应消息
    • 消息在传递过程中始终包含两个属性correlationId和replyTo
      -- correlationId用以标识请求和结果的对应关系
      -- replyTo标识结果要去的队列
    • 此模式下可以添加多个C端,以提升处理性能,通过basicQos控制负载
    主子线程间数据传递

    以上六种消息的分发模式都要根据场景设置容错和消息补偿机制

    代码实战

    work工作模式未设置prefetch1 work工作模式设置prefetch1 手动确认

    相关文章

      网友评论

          本文标题:RabbitMQ操作指南

          本文链接:https://www.haomeiwen.com/subject/cknlihtx.html