MQ之RabbitMQ

作者: inke | 来源:发表于2017-06-08 15:49 被阅读145次
    • 更新记录:
      • 2017.07.18 增加 MAC 安装方式。
        [toc]
        简书不支持 toc 目录模式,截图一张。


        image.png
        image.png

    简介

    MQ为Message Queue,消息队列是应用程和应用程序之间的通信方法。
    RabbitMQ是一个开源的,在AMQP基础上完整的,可复用的企业消息系统。
    支持主流的操作系统,Linux、Windows、MacOX等。
    多种开发语言支持,Java、Python、Ruby、.NET、PHP、C/C++、node.js等

    AMQP

    AMQP是消息队列的一个协议。
    AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。

    RabbitMQ官网地址http://www.rabbitmq.com/

    Linux安装RabbitMQ

    安装Erlang

    添加yum支持

    在线安装:

    cd /usr/local/src/
    mkdir rabbitmq
    cd rabbitmq
    
    wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
    rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
    
    rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc
    
    sudo yum install erlang
    

    离线安装:

    centos相关资源:
    http://omj2w5bt7.bkt.clouddn.com/rabbitmq-server-3.4.1-1.noarch.rpm
    http://omj2w5bt7.bkt.clouddn.com/esl-erlang-compat-R14B-1.el6.noarch.rpm
    http://omj2w5bt7.bkt.clouddn.com/esl-erlang_17.3-1centos6_amd64.rpm
    http://omj2w5bt7.bkt.clouddn.com/esl-erlang-17.3-1.x86_64.rpm

    windows相关资源:
    http://omj2w5bt7.bkt.clouddn.com/rabbitmq-server-3.4.1.exe
    http://omj2w5bt7.bkt.clouddn.com/otp_win32_17.4.exe
    http://omj2w5bt7.bkt.clouddn.com/otp_win64_17.3.exe

    
    上传esl-erlang_17.3-1~centos~6_amd64.rpm
    执行 yum install esl-erlang_17.3-1~centos~6_amd64.rpm
    
    上传:esl-erlang-compat-R14B-1.el6.noarch.rpm
    yum install esl-erlang-compat-R14B-1.el6.noarch.rpm
    

    安装RabbitMQ

    上传rabbitmq-server-3.4.1-1.noarch.rpm文件到/usr/local/src/rabbitmq/
    安装:
    rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm
    

    启动、停止

    service rabbitmq-server start
    service rabbitmq-server stop
    service rabbitmq-server restart
    

    设置开机启动

    chkconfig rabbitmq-server on
    

    设置配置文件

    cd /etc/rabbitmq
    cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/
    mv rabbitmq.config.example rabbitmq.config
    

    开启用户远程访问

    vi /etc/rabbitmq/rabbitmq.config
    

    开启web界面管理工具

    rabbitmq-plugins enable rabbitmq_management
    service rabbitmq-server restart

    防火墙开放15672端口

    /sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
    /sbin/iptables -I INPUT -p tcp --dport 5672 -j ACCEPT
    /etc/rc.d/init.d/iptables save


    MAC安装RabbitMQ

    安装:brew install rabbitmq

    配置环境变量:

    RabbitMQ Config

    需要在 bash 或者 zsh 的环境变量中添加:

    export PATH=$PATH:/usr/local/sbin
    

    不然下面的指令,就必须到/usr/local/sbin目录下,才能直接执行。
    启动:rabbitmq-server
    停止:rabbitmqctl stop
    状态:rabbitmqctl status

    访问:http://localhost:15672/
    默认的用户名和密码是 guestguest


    基本使用

    访问界面管理:


    使用 guest/guest 进行登录。

    添加用户

    界面介绍:


    创建虚拟主机

    用户授权虚拟主机




    通过交换机绑定队列或者通过队列绑定交换机都可以。



    队列

    简单队列


    P:消息的生产者
    C:消息的消费者
    红色:队列

    生产者将消息发送到队列,消费者从队列中获取消息。

    代码测试:

    导入下面测试代码,运行完 Send.java 消息队列中就有"Hello World! -- 1"字符串,在运行Recv.java,就会取出字符串。

    启动接收者Recv.java,就可以在 rabbitMQ 管理平台看到如下的一些信息:



    启动接收者Send.java,就可以在 rabbitMQ 看到队列中的信息:


    Recv.java

    public class Recv {
    
        private final static String QUEUE_NAME = "test_queue";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列
            channel.basicConsume(QUEUE_NAME, true, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
            }
        }
    }
    

    Send.java

    public class Send {
    
        private final static String QUEUE_NAME = "test_queue";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            // 从连接中创建通道
            Channel channel = connection.createChannel();
    
            // 声明(创建)队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 消息内容
            String message = "Hello World! -- 1";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
    
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    }
    

    ConnectionUtil.java

    public class ConnectionUtil {
    
        public static Connection getConnection() throws Exception {
            //定义连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置服务地址
            factory.setHost("192.168.18.130");
            //端口
            factory.setPort(5672);
            //设置账号信息,用户名、密码、vhost
            factory.setVirtualHost("/inke-test");
            factory.setUsername("inke");
            factory.setPassword("inke");
            // 通过工程获取连接
            Connection connection = factory.newConnection();
            return connection;
        }
    
    }
    

    pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.example</groupId>
        <artifactId>rabbitmq-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>rabbitmq-demo</name>
        <url>http://maven.apache.org</url>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>3.8.1</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.4.1</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.7</version>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.3.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>1.4.0.RELEASE</version>
            </dependency>
        </dependencies>
    </project>
    

    Work模式

    一个生产者、2个消费者。
    一个消息只能被一个消费者获取。

    Send.java

    public class Send {
    
        private final static String QUEUE_NAME = "test_queue_work";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            for (int i = 0; i < 50; i++) {
                // 消息内容
                String message = "" + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
    
                Thread.sleep(i * 10);
            }
    
            channel.close();
            connection.close();
        }
    }
    

    Recv.java

    public class Recv {
    
        private final static String QUEUE_NAME = "test_queue_work";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 同一时刻服务器只会发一条消息给消费者
            // channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
                //休眠
                Thread.sleep(10);
                // 返回确认状态
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    Recv2.java

    public class Recv2 {
    
        private final static String QUEUE_NAME = "test_queue_work";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 同一时刻服务器只会发一条消息给消费者
            // channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成状态
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
                // 休眠1秒
                Thread.sleep(1000);
    
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    测试结果:

    1、消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
    2、消费者1和消费者2获取到的消息的数量是相同的,一个是奇数一个是偶数。
    其实,这样是不合理的,应该是消费者1要比消费者2获取到的消息多才对。

    Work模式的“能者多劳”

    消费者需要打开这个开关


    测试结果:

    消费者1比消费者2获取的消息更多。


    消息的确认模式

    消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

    • 模式1:自动确认
      只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都认为是消息已经成功消费。

    • 模式2:手动确认
      消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。

    手动模式:

    手动模式:

    解读:
    1、1个生产者,多个消费者
    2、每一个消费者都有自己的一个队列
    3、生产者没有将消息直接发送到队列,而是发送到了交换机
    4、每个队列都要绑定到交换机
    5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的

    代码演示:

    需求是:一个后台服务提供给前台和搜索展示数据,如何保证数据的统一性。

    向交换机中发送消息。


    注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。

    消费者1(看作是前台系统)


    消费者2(看作是搜索系统)


    测试结果:

    同一个消息被多个消费者获取。

    在管理工具中查看队列和交换机的绑定关系:



    使用订阅模式能否实现商品数据的同步?

    答案:可以的。

    后台系统就是消息的生产者。
    前台系统和搜索系统是消息的消费者。
    后台系统将消息发送到交换机中,前台系统和搜索系统都创建自己的队列,然后将队列绑定到交换机,即可实现。

    消息,新增商品、修改商品、删除商品。

    前台系统:修改商品、删除商品。
    搜索系统:新增商品、修改商品、删除商品。

    所以使用订阅模式实现商品数据的同步并不合理。


    路由模式


    如图所示:c1 队列绑定了 error ,c2 队列绑定了 info、error、warning。
    如果发送 error 的消息,那么 c1 和 c2 都可以收到,发送 info、warning 只有c2可以收到,可以随意组合,更加自由。
    例如:前台系统只需要收取“更新“”删除”的消息,而搜索系统还需要收取“新增”的消息。





    测试结果:
    当发送的key是 insert 的时候,只有Recv2队列的test_queue_direct_2才能接收到消息,
    当发送的key是 delete、update的时候,Recv1和Recv2都能接收到消息。


    通配符模式

    将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
    符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。*
    因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
    topic交换机是如何工作的:


    生产者(后台系统)Send.java

    public class Send {
    
        private final static String EXCHANGE_NAME = "test_exchange_topic";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
            // 消息内容
            String message = "商品删除,id=1003";
            channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
            System.out.println(" 后台系统: '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }
    

    消费者1(前台系统)Recv.java

    public class Recv {
    
        private final static String QUEUE_NAME = "test_queue_topic_1";
    
        private final static String EXCHANGE_NAME = "test_exchange_topic";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
    
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" 前台系统: '" + message + "'");
                Thread.sleep(10);
    
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    消费者2(搜索系统)Recv2.java

    public class Recv2 {
    
        private final static String QUEUE_NAME = "test_queue_topic_2";
    
        private final static String EXCHANGE_NAME = "test_exchange_topic";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");
    
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" 搜索系统: '" + message + "'");
                Thread.sleep(10);
    
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    测试结果:item.# 匹配所有前缀是 item 的消息。


    Spring-Rabbit

    Spring项目
    http://spring.io/projects

    The project consists of two parts; spring-amqp is the base abstraction, and spring-rabbit is the RabbitMQ implementation.
    该项目由两部分组成: spring-amqp是基础抽象,spring-rabbit是RabbitMQ实现。

    代码演示:

    public class SpringMain {
        
        public static void main(final String... args) throws Exception {
            AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
                    "classpath:spring/rabbitmq-context.xml");
            //RabbitMQ模板
            RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
            //发送消息
            template.convertAndSend("Hello, world!");
            Thread.sleep(1000);// 休眠1秒
            ctx.destroy(); //容器销毁
        }
    }
    
    /**
     * 消费者
     */
    public class Foo {
    
        //具体执行业务的方法
        public void listen(String foo) {
            System.out.println("消费者: " + foo);
        }
    }
    

    rabbitmq-context.xml

    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        xsi:schemaLocation="http://www.springframework.org/schema/rabbit
        http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
    
        <!-- 定义RabbitMQ的连接工厂 -->
        <rabbit:connection-factory id="connectionFactory"
            host="192.168.18.130" port="5672" username="inke" password="inke" virtual-host="/inke-test" />
    
        <!-- 定义Rabbit模板,指定连接工厂以及定义exchange -->
        <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
        <!-- <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
            exchange="fanoutExchange" routing-key="foo.bar" /> -->
    
        <!-- MQ的管理,包括队列、交换器等 -->
        <rabbit:admin connection-factory="connectionFactory" />
    
        <!-- 定义队列,自动声明 -->
        <rabbit:queue name="myQueue" auto-declare="true" durable="false"/>
    
        <!-- 定义交换器,自动声明 -->
        <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true" durable="false">
            <rabbit:bindings>
                <rabbit:binding queue="myQueue"/>
            </rabbit:bindings>
        </rabbit:fanout-exchange>
    
    <!--    <rabbit:topic-exchange name="myExchange">
            <rabbit:bindings>
                <rabbit:binding queue="myQueue" pattern="foo.*" />
            </rabbit:bindings>
        </rabbit:topic-exchange> -->
    
        <!-- 队列监听 -->
        <rabbit:listener-container connection-factory="connectionFactory">
            <rabbit:listener ref="foo" method="listen" queue-names="myQueue" />
        </rabbit:listener-container>
    
        <bean id="foo" class="com.example.rabbitmq.spring.Foo" />
    
    </beans>
    


    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
    这里可以直接选择发送到哪个队列,而不是交换机。
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" queue="myQueue"/>


    持久化交换机和队列

    持久化:将交换机或队列的数据保存到磁盘,服务器宕机或重启之后依然存在。
    非持久化:将交换机或队列的数据保存到内存,服务器宕机或重启之后将不存在。

    非持久化的性能高于持久化。
    如何选择持久化?非持久化?-- 看需求。



    参考自:
    某培训机构
    CSDN
    spring 官网

    相关文章

      网友评论

        本文标题:MQ之RabbitMQ

        本文链接:https://www.haomeiwen.com/subject/kghtqxtx.html