美文网首页
RabbitMQ-消息中间件

RabbitMQ-消息中间件

作者: 爪洼程序员一枚 | 来源:发表于2020-12-21 13:50 被阅读0次

    一、MQ基本概念

    本笔记涉及的代码:百度网盘链接
    链接:https://pan.baidu.com/s/1kros8iRqhC6YhzNvRIV7vg
    提取码:3ixc
    环境:jdk1.8 maven-3.5.2

    1.概念

    image.png
    image.png

    2.MQ的优势和劣势:

    image.png

    ·系统的耦合度越高,容错性就越低,可维护性就越低,使用MQ使得应用间解耦,提升容错性和可维护性。
    ·异步提速:提升用户体验和系统吞吐量(单位时间内处理请求的数目)
    ·削峰填谷: 提高系统的稳定性


    image.png

    3.常见的MQ产品

    image.png

    4.RabbitMQ简介

    image.png
    image.png
    image.png
    image.png
    image.png

    4.JMS简介

    image.png

    二、RabbitMQ安装和配置

    官网 https://www.rabbitmq.com/
    1.点击Get Started

    image.png
    2.下载安装和文档
    image.png
    3.下载安装RabbitMQ
    RabbitMQ默认安装位置: /usr/share/doc/rabbitmq-server-3.6.5/
    (1)注意:安装RabbitMQ选择版本时,RabbitMQ版本要和安装的Erlang版本一致,不然会安装出错,访问下面网址,查看对应版本信息。
    https://www.rabbitmq.com/which-erlang.html
    
    image.png

    (2)安装依赖环境
    在线安装依赖环境:

    yum install -y build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc x
    

    (3)下载安装包
    下载erlang:

    wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
    

    下载socat:

    wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
    

    下载rabbitmq-server:

    wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
    
    image.png

    (4)安装
    安装erlang

    rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
    

    安装socat加解密软件

    rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
    

    安装rabbitmq

    rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm 
    
    image.png

    (5)开启管理界面及配置

    开启管理界面及配置

    rabbitmq-plugins enable rabbitmq_management
    

    修改默认配置信息

    cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin
    vim rabbit.app
    

    比如修改密码、配置等等,例如:loopback_users中的<<"guest">>,只保留guest。
    将guest用户打开才能进入rabbitmq控制台


    image.png

    (6)启动
    service rabbitmq-server start #启动服务
    service rabbitmq-server stop #停止服务
    service rabbitmq-server restart #重启服务
    (7)rabbitmq控制台启动登录
    第一步:service rabbitmq-server start #启动服务
    第二步:在浏览器中访问 linux系统ip:15672 会出现下面的页面 用户名和密码都是 guest


    image.png
    注意:如果访问失败:执行 iptables -F 命令关闭防火墙,在重新进行访问。如果你已经启动了rabbitmq后,修改了rabbit.app文件就需要重新启动rabbitmq。
    image.png
    出现这个界面就意味着rabbitmq的管理控制台和server安装成功了。

    三、RabbitMQ管理平台使用

    节点配置文件配置:


    image.png
    image.png
    image.png

    1.添加一个用户

    image.png
    image.png

    2.添加一个虚拟机

    image.png

    添加成功后出现:


    image.png
    image.png
    image.png

    设置这些后,可以查看到我们创建的hhb用户可以访问/hhb虚拟机


    image.png
    image.png

    四、RabbitMQ入门程序

    需求:使用简单模式完成消息传递
    步骤:
    (1)创建工程(生产者、消费者)
    (2)分别添加依赖
    (3)编写生产者发送消息

    1.新建两个maven工程rabbit_producer (生产者)、rabbit_consumer(消费者)

    在pom.xml中都添加下面依赖

    <dependencies>
            <!-- rabbitmq java客户端-->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.6.0</version>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    

    2.编写生产者java代码

    image.png
    image.png
    public class ProducerHelloWorld {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
            factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
            factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
            factory.setUsername("hhb");             //用户名;默认是 guest
            factory.setPassword("hhb");             //密码;默认是 guest
            //3.创建连接 Connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.创建队列Queue
            /*
             queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
             参数:
                1.queue:队列名称
                2.durable:是否持久化,如果持久化,会保存到erlang自带的一个数据库中。当mq重启之后,还在。
                3.exclusive:一般设置为false
                    *是否独占;只能有一个消费者监听这队列
                    *当Connection关闭时,是否删除队列
                4.autoDelete:是否自动删除。当没有consumer时,自动删除掉。
                5.arguments:删除队列信息的一些参数信息
             */
            //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
            channel.queueDeclare("hello_world",true,false,false,null);
            //6.发送消息
            /*
                basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
                参数:
                    * 1.交换机名称;简单模式下交换机会使用默认的"",如果使用默认的"",路由名称routingKey要和队列名称一样
                    * 2.routingKey:路由名称
                    * 3.props:配置信息
                    * 4.body:真实发送的消息数据
             */
            String body="hello rabbitmq O_O";
            channel.basicPublish("","hello_world",null,body.getBytes());
    
            //7.释放资源
            channel.close();
            connection.close();
        }
    }
    

    当我们执行代码后:


    image.png
    image.png

    3.编写消费者java代码

    public class ConsumerHelloWorld {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
            factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
            factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
            factory.setUsername("hhb");             //用户名;默认是 guest
            factory.setPassword("hhb");             //密码;默认是 guest
            //3.创建连接 Connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.创建队列Queue
            /*
             queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
             参数:
                1.queue:队列名称
                2.durable:是否持久化,如果持久化,会保存到erlang自带的一个数据库中。当mq重启之后,还在。
                3.exclusive:一般设置为false
                    *是否独占;只能有一个消费者监听这队列
                    *当Connection关闭时,是否删除队列
                4.autoDelete:是否自动删除。当没有consumer时,自动删除掉。
                5.arguments:删除队列信息的一些参数信息
             */
            //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
            channel.queueDeclare("hello_world",true,false,false,null);
            //6.接收消息
            /*
                basicConsume(String queue, boolean autoAck, Consumer callback)
                参数:
                    * 1.queue: 队列名称
                    * 2.autoAck:是否自动确认
                    * 3.callback:回调对象
            */
            Consumer consumer=new DefaultConsumer(channel){
                /*
                     回调方法,当收到消息后,会自动执行该方法。
                     1.consumerTag:标识
                     2.envelope:获取一些信息,交换机,路由key
                     3.properties:配置信息
                     4.body:真实数据
                */
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("consumerTag: "+consumerTag);
                    System.out.println("envelope: "+envelope.getExchange());
                    System.out.println("properties: "+properties);
                    System.out.println("body: "+ new String(body));
                }
            };
            channel.basicConsume("hello_world",true,consumer);
    
            //7.不要关闭资源
    
        }
    }
    

    消费前:


    image.png

    消费后:


    image.png
    image.png

    小结:


    image.png

    五、RabbitMQ工作模式

    上面已经讲过简单模式。(一条消息只能被一个消费者消费)

    5.1Work queues 工作队列模式 (一条消息只能被一个消费者消费)

    1.模式说明


    image.png

    2.生产者代码


    image.png
    /*
     *发送消息
     */
    public class ProducerWorkQueues {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
            factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
            factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
            factory.setUsername("hhb");             //用户名;默认是 guest
            factory.setPassword("hhb");             //密码;默认是 guest
            //3.创建连接 Connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.创建队列Queue
            /*
             queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
             参数:
                1.queue:队列名称
                2.durable:是否持久化,如果持久化,会保存到erlang自带的一个数据库中。当mq重启之后,还在。
                3.exclusive:一般设置为false
                    *是否独占;只能有一个消费者监听这队列
                    *当Connection关闭时,是否删除队列
                4.autoDelete:是否自动删除。当没有consumer时,自动删除掉。
                5.arguments:删除队列信息的一些参数信息
             */
            //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
            channel.queueDeclare("work_queues",true,false,false,null);
            //6.发送消息
            /*
                basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
                参数:
                    * 1.交换机名称;简单模式下交换机会使用默认的"",如果使用默认的"",路由名称routingKey要和队列名称一样
                    * 2.routingKey:路由名称
                    * 3.props:配置信息
                    * 4.body:真实发送的消息数据
             */
            // 我们模拟一次发送多条消息。
            for (int i = 0; i < 10; i++) {
                String body=i+" hello rabbitmq O_O";
                channel.basicPublish("","hello_world",null,body.getBytes());
            }
    
            //7.释放资源
            channel.close();
            connection.close();
        }
    }
    

    3.消费者代码


    image.png
    public class ConsumerWorkQueues {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
            factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
            factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
            factory.setUsername("hhb");             //用户名;默认是 guest
            factory.setPassword("hhb");             //密码;默认是 guest
            //3.创建连接 Connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.创建队列Queue
            /*
             queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
             参数:
                1.queue:队列名称
                2.durable:是否持久化,如果持久化,会保存到erlang自带的一个数据库中。当mq重启之后,还在。
                3.exclusive:一般设置为false
                    *是否独占;只能有一个消费者监听这队列
                    *当Connection关闭时,是否删除队列
                4.autoDelete:是否自动删除。当没有consumer时,自动删除掉。
                5.arguments:删除队列信息的一些参数信息
             */
            //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
            channel.queueDeclare("work_queues",true,false,false,null);
            //6.接收消息
            /*
                basicConsume(String queue, boolean autoAck, Consumer callback)
                参数:
                    * 1.queue: 队列名称
                    * 2.autoAck:是否自动确认
                    * 3.callback:回调对象
            */
            Consumer consumer=new DefaultConsumer(channel){
                /*
                     回调方法,当收到消息后,会自动执行该方法。
                     1.consumerTag:标识
                     2.envelope:获取一些信息,交换机,路由key
                     3.properties:配置信息
                     4.body:真实数据
                */
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    //                System.out.println("consumerTag: "+consumerTag);
    //                System.out.println("envelope: "+envelope.getExchange());
    //                System.out.println("properties: "+properties);
                    System.out.println("body: "+ new String(body));
                }
            };
            channel.basicConsume("work_queues",true,consumer);
    
            //7.不要关闭资源
    
        }
    }
    

    两个java(ConsumerWorkQueues和ConsumerWorkQueues2)文件内容代码是一样的。
    4.测试
    第一步:启动两个消费者


    image.png

    启动后可以看到如下,队列work_queues:


    image.png
    第二步:启动生产者
    image.png
    生产者启动后,两个消费者立即消费,它们依次轮流消费一个消息:
    image.png
    image.png

    小结:


    image.png

    5.2Pub/Sub 订阅模式模式 (从这里开始后面的一条消息可以被多个消费者消费)

    1.模式说明(这里开始加入了交换机,交换机类型为BuiltinExchangeType.FANOUT)


    image.png

    生产者发送消息后通过交换机,交换机会把消息发给每一个与该交换机绑定的队列。
    2.需求
    生产者生产一条日志消息,一个消费者把日志信息打印出来,另一个消费者把日志信息存在数据库。
    3.生产者代码


    image.png
    public class ProducerPubSub {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
            factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
            factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
            factory.setUsername("hhb");             //用户名;默认是 guest
            factory.setPassword("hhb");             //密码;默认是 guest
            //3.创建连接 Connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
           //5.创建交换机
            /*
                exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
                参数:
                    * 1.exchange:交换机名称
                    * 2.type:交换机类型 ,枚举
                        DIRECT("direct"), 定向
                        FANOUT("fanout"), 扇形(广播),发送消息给每一个与子绑定队列
                        TOPIC("topic"),   通配符
                        HEADERS("headers");参数匹配
                    * 3.durable:是否持久化
                    * 4.autoDelete:自动删除
                    * 5.internal:内部使用。一般false
                    * 6.arguments:参数
             */
            String exchangeName = "test_fanout";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
            //6.创建队列
            String queue1Name="test_fanout_queue1";
            String queue2Name="test_fanout_queue2";
            channel.queueDeclare(queue1Name,true,false,false,null);
            channel.queueDeclare(queue2Name,true,false,false,null);
            //7.绑定队列和交换机
            /*
                queueBind(String queue, String exchange, String routingKey)
                参数:
                    * 1.queue:队列名称
                    * 2.exchange:交换机名称
                    * 3.routingKey:路由键,绑定规则
                        如果交换机的类型为fanout,routingKey设置为"",意味着交换机会把消息分发给每一个与之绑定的队列
             */
            channel.queueBind(queue1Name,exchangeName,"");
            channel.queueBind(queue2Name,exchangeName,"");
            //8.发送消息
            String body="日志信息: 张三调用了findAll方法------日志级别:info-----";
            channel.basicPublish(exchangeName,"",null,body.getBytes());
            //9.释放资源
            channel.close();
            connection.close();
        }
    }
    
    image.png
    image.png

    4.消费者代码


    image.png
    public class ConsumerPubSub1 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
            factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
            factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
            factory.setUsername("hhb");             //用户名;默认是 guest
            factory.setPassword("hhb");             //密码;默认是 guest
            //3.创建连接 Connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.接收消息
            /*
                basicConsume(String queue, boolean autoAck, Consumer callback)
                参数:
                    * 1.queue: 队列名称
                    * 2.autoAck:是否自动确认
                    * 3.callback:回调对象
            */
            Consumer consumer=new DefaultConsumer(channel){
                /*
                     回调方法,当收到消息后,会自动执行该方法。
                     1.consumerTag:标识
                     2.envelope:获取一些信息,交换机,路由key
                     3.properties:配置信息
                     4.body:真实数据
                */
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    //                System.out.println("consumerTag: "+consumerTag);
    //                System.out.println("envelope: "+envelope.getExchange());
    //                System.out.println("properties: "+properties);
                    System.out.println("body: "+ new String(body));
                    System.out.println("把日志打印到控制台");
                }
            };
            String queue1Name="test_fanout_queue1";
            channel.basicConsume(queue1Name,true,consumer);
    
            //6.不要关闭资源
    
        }
    }
    

    ConsumerPubSub1 和ConsumerPubSub2代码基本一致,区别在于对于日志信息打印不同。


    image.png

    5.3Rounting路由模式

    1.模式说明(交换机根据不同的路由key选择不同的队列,交换机类型为BuiltinExchangeType.DIRECT)


    image.png

    2.生产者代码


    image.png
    public class ProducerRounting {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
            factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
            factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
            factory.setUsername("hhb");             //用户名;默认是 guest
            factory.setPassword("hhb");             //密码;默认是 guest
            //3.创建连接 Connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
           //5.创建交换机
            /*
                exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
                参数:
                    * 1.exchange:交换机名称
                    * 2.type:交换机类型 ,枚举
                        DIRECT("direct"), 定向
                        FANOUT("fanout"), 扇形(广播),发送消息给每一个与子绑定队列
                        TOPIC("topic"),   通配符
                        HEADERS("headers");参数匹配
                    * 3.durable:是否持久化
                    * 4.autoDelete:自动删除
                    * 5.internal:内部使用。一般false
                    * 6.arguments:参数
             */
            String exchangeName = "test_direct";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
            //6.创建队列
            String queue1Name="test_direct_queue1";
            String queue2Name="test_direct_queue2";
            channel.queueDeclare(queue1Name,true,false,false,null);
            channel.queueDeclare(queue2Name,true,false,false,null);
            //7.绑定队列和交换机
            /*
                queueBind(String queue, String exchange, String routingKey)
                参数:
                    * 1.queue:队列名称
                    * 2.exchange:交换机名称
                    * 3.routingKey:路由键,绑定规则
                        如果交换机的类型为fanout,routingKey设置为"",意味着交换机会把消息分发给每一个与之绑定的队列
             */
            //队列1的绑定
            channel.queueBind(queue1Name,exchangeName,"error");
            //队列2的绑定
            channel.queueBind(queue2Name,exchangeName,"info");
            channel.queueBind(queue2Name,exchangeName,"error");
            channel.queueBind(queue2Name,exchangeName,"warning");
            //8.发送消息
            String body="日志信息: 张三调用了findAll方法------日志级别:info-----";
            channel.basicPublish(exchangeName,"info",null,body.getBytes());
            //9.释放资源
            channel.close();
            connection.close();
        }
    }
    

    运行后:


    image.png
    image.png

    3.消费者代码


    image.png
    //这是ConsumerRounting1代码
    public class ConsumerRounting1 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
            factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
            factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
            factory.setUsername("hhb");             //用户名;默认是 guest
            factory.setPassword("hhb");             //密码;默认是 guest
            //3.创建连接 Connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.接收消息
            /*
                basicConsume(String queue, boolean autoAck, Consumer callback)
                参数:
                    * 1.queue: 队列名称
                    * 2.autoAck:是否自动确认
                    * 3.callback:回调对象
            */
            Consumer consumer=new DefaultConsumer(channel){
                /*
                     回调方法,当收到消息后,会自动执行该方法。
                     1.consumerTag:标识
                     2.envelope:获取一些信息,交换机,路由key
                     3.properties:配置信息
                     4.body:真实数据
                */
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    //                System.out.println("consumerTag: "+consumerTag);
    //                System.out.println("envelope: "+envelope.getExchange());
    //                System.out.println("properties: "+properties);
                    System.out.println("body: "+ new String(body));
                }
            };
            String queue1Name="test_direct_queue1";
            channel.basicConsume(queue1Name,true,consumer);
    
            //6.不要关闭资源
    
        }
    }
    

    当我们执行ConsumerRounting2代码时,打印了日志信息。ConsumerRounting2和ConsumerRounting1代码基本一样,只是监听的队列名不一样;


    image.png
    image.png

    小结:


    image.png

    5.4Topics通配符模式

    1.模式 (交换机类型类型为BuiltinExchangeType.TOPIC)


    image.png

    2.生产者代码


    image.png
    public class ProducerTopics {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
            factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
            factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
            factory.setUsername("hhb");             //用户名;默认是 guest
            factory.setPassword("hhb");             //密码;默认是 guest
            //3.创建连接 Connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
           //5.创建交换机
            /*
                exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
                参数:
                    * 1.exchange:交换机名称
                    * 2.type:交换机类型 ,枚举
                        DIRECT("direct"), 定向
                        FANOUT("fanout"), 扇形(广播),发送消息给每一个与子绑定队列
                        TOPIC("topic"),   通配符
                        HEADERS("headers");参数匹配
                    * 3.durable:是否持久化
                    * 4.autoDelete:自动删除
                    * 5.internal:内部使用。一般false
                    * 6.arguments:参数
             */
            String exchangeName = "test_Topics";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
            //6.创建队列
            String queue1Name="test_Topics_queue1";
            String queue2Name="test_Topics_queue2";
            channel.queueDeclare(queue1Name,true,false,false,null);
            channel.queueDeclare(queue2Name,true,false,false,null);
            //7.绑定队列和交换机
            /*
                queueBind(String queue, String exchange, String routingKey)
                参数:
                    * 1.queue:队列名称
                    * 2.exchange:交换机名称
                    * 3.routingKey:路由键,绑定规则
                        如果交换机的类型为fanout,routingKey设置为"",意味着交换机会把消息分发给每一个与之绑定的队列
             */
            //rounting key 假如由   系统的名称.日志的级别  这两部分组成
            //需求:
            //  队列1:所有error级别的日志存入数据库,所有order系统的日志存入数据库。
            //  队列2:所有日志信息打印到控制台
            //队列1的绑定
            channel.queueBind(queue1Name,exchangeName,"#.error");
            channel.queueBind(queue1Name,exchangeName,"order.*");
            //队列2的绑定
            channel.queueBind(queue2Name,exchangeName,"*.*");
            //8.发送消息
            String body="日志信息: 张三调用了findAll方法------日志级别:info-----";
            channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
            //9.释放资源
            channel.close();
            connection.close();
        }
    }
    

    生产者代码执行后:


    image.png
    image.png

    3.消费者代码


    image.png
    image.png
    代码:
    public class ConsumerTopic1 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory=new ConnectionFactory();
            //2.设置参数
            factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
            factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
            factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
            factory.setUsername("hhb");             //用户名;默认是 guest
            factory.setPassword("hhb");             //密码;默认是 guest
            //3.创建连接 Connection
            Connection connection = factory.newConnection();
            //4.创建channel
            Channel channel = connection.createChannel();
            //5.接收消息
            /*
                basicConsume(String queue, boolean autoAck, Consumer callback)
                参数:
                    * 1.queue: 队列名称
                    * 2.autoAck:是否自动确认
                    * 3.callback:回调对象
            */
            Consumer consumer=new DefaultConsumer(channel){
                /*
                     回调方法,当收到消息后,会自动执行该方法。
                     1.consumerTag:标识
                     2.envelope:获取一些信息,交换机,路由key
                     3.properties:配置信息
                     4.body:真实数据
                */
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    //                System.out.println("consumerTag: "+consumerTag);
    //                System.out.println("envelope: "+envelope.getExchange());
    //                System.out.println("properties: "+properties);
                    System.out.println("body: "+ new String(body));
                    System.out.println("error级别的日志,所有order系统的日志已存入数据库。");
                }
            };
            String queue1Name="test_Topics_queue1";
            channel.basicConsume(queue1Name,true,consumer);
    
            //6.不要关闭资源
    
        }
    }
    

    ConsumerTopic1和ConsumerTopic2代码差不多。
    小结:


    image.png

    5.5 工作模式总结

    image.png

    六、Spring整合RabbitMQ

    6.1 生产者

    (1)创建生产者工程naven工程(spring-rabbitmq-producer)
    (2)添加依赖

    <dependencies>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context</artifactId>
                <version>5.2.2.RELEASE</version>
            </dependency>
            <!-- spring整合rabbitmq的依赖包-->
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>2.1.8.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>5.2.2.RELEASE</version>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    

    (3)配置整合
    1.rabbitmq.properties

    rabbitmq.host=192.168.2.34
    rabbitmq.port=5672
    rabbitmq.username=hhb
    rabbitmq.password=hhb
    rabbitmq.virtual-host=/hhb
    
      2.spring-rabbitmq-producer.xml
    
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context.xsd
           http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
        <!--加载配置文件-->
        <context:property-placeholder location="rabbitmq.properties"/>
    
        <!--定义rabbitmq connectionFactory-->
        <rabbit:connection-factory
            id="connectionFactory"
            host="${rabbitmq.host}"
            port="${rabbitmq.port}"
            username="${rabbitmq.username}"
            password="${rabbitmq.password}"
            virtual-host="${rabbitmq.virtual-host}"/>
    
        <!--定义管理交换机、队列-->
        <rabbit:admin connection-factory="connectionFactory"/>
    
        <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~简单方式~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
        <!--定义持久化队列,不存在则自动创建,不绑定到交换机则绑定到默认交换机
         默认交换机类型为direct,名字为:"",路由键为队列的名称-->
        <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
    
        <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播:所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
        <!--定义广播交换机中的持久化队列,不存在则自动创建 -->
        <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
    
        <!--定义广播交换机中的持久化队列,不存在则自动创建 -->
        <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
    
        <!--定义广播类型交换机,并绑定上述两个队列 -->
        <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
            <rabbit:bindings>
                <rabbit:binding queue="spring_fanout_queue_1"/>
                <rabbit:binding queue="spring_fanout_queue_2"/>
            </rabbit:bindings>
        </rabbit:fanout-exchange>
    
        <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符:*匹配一个单词,#匹配多个单词~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
        <!--定义广播交换机中的持久化队列,不存在则自动创建 -->
        <rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
    
        <!--定义广播交换机中的持久化队列,不存在则自动创建 -->
        <rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
    
        <!--定义广播交换机中的持久化队列,不存在则自动创建 -->
        <rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
    
        <!--定义广播类型交换机,并绑定上述两个队列 -->
        <rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
            <rabbit:bindings>
                <rabbit:binding pattern="hhb.*" queue="spring_topic_queue_star"/>
                <rabbit:binding pattern="hhb.#" queue="spring_topic_queue_well"/>
                <rabbit:binding pattern="it.#" queue="spring_topic_queue_well2"/>
            </rabbit:bindings>
        </rabbit:topic-exchange>
    
        <!--定义rabbitTemplate对象操作可以在代码中方便发送消息 -->
        <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
    </beans>
    

    (4)编写代码发送消息


    image.png

    代码:

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
    public class ProducerTest {
        //1.注入RabbitTemplate
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testHelloWorld(){
            //2.发送消息
            rabbitTemplate.convertAndSend("spring_queue","hello world spring");
        }
        /**
         *发送fanout消息
         */
        @Test
        public void testFanout(){
            //2.发送消息
            rabbitTemplate.convertAndSend("spring_fanout_exchange","","spring fanout...");
        }
    
        /**
         *发送topic消息
         */
        @Test
        public void testTopic(){
            //2.发送消息
            rabbitTemplate.convertAndSend("spring_topic_exchange","hhb.nice","spring topic...");
        }
    }
    

    执行代码以后:


    image.png

    6.2 消费者

    (1)创建消费者工程 maven工程 (spring-rabbitmq-consumer)
    (2)添加依赖

        <dependencies>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context</artifactId>
                <version>5.2.2.RELEASE</version>
            </dependency>
            <!-- spring整合rabbitmq的依赖包-->
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>2.1.8.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>5.2.2.RELEASE</version>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    

    (3)配置文件
    1.rabbitmq.properties 与生产者一样
    2.spring-rabbitmq-consumer.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context.xsd
           http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
        <!--加载配置文件-->
        <context:property-placeholder location="rabbitmq.properties"/>
    
        <!--定义rabbitmq connectionFactory-->
        <rabbit:connection-factory
            id="connectionFactory"
            host="${rabbitmq.host}"
            port="${rabbitmq.port}"
            username="${rabbitmq.username}"
            password="${rabbitmq.password}"
            virtual-host="${rabbitmq.virtual-host}"/>
        <!--消费代码类(消息监听器)-->
        <bean id="springQueueListener" class="com.hhb.rabbitmq.listener.SpringQueueListener"/>
        <bean id="fanoutListener1" class="com.hhb.rabbitmq.listener.FanoutListener1"/>
        <bean id="fanoutListener2" class="com.hhb.rabbitmq.listener.FanoutListener2"/>
        <bean id="topicListenerStar" class="com.hhb.rabbitmq.listener.TopicListenerStar"/>
        <bean id="topicListenerWell" class="com.hhb.rabbitmq.listener.TopicListenerWell"/>
        <bean id="topicListenerWell2" class="com.hhb.rabbitmq.listener.TopicListenerWell2"/>
    
        <!--监听器容器 监听器监听哪个队列-->
        <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
            <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
            <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>
            <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
            <rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>
            <rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>
            <rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>
        </rabbit:listener-container>
    </beans>
    

    (4)编写消息监听器


    image.png

    以其中SpringQueueListener为例(其他类都是一样的写法):


    image.png
    编写一个测试类,进行测试:
    image.png

    七、SpringBoot整合RabbitMQ

    7.1生产者

    步骤:


    image.png

    1.创建生产者SpringBoot工程
    2.引入依赖
    3.编写yml配置

    spring:
      rabbitmq:
        username: guest
        password: guest
        virtual-host: /
        addresses: 192.168.2.34:5672
    

    4.定义交换机、队列,绑定交换机队列


    image.png
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMQConfig {
        public  static final String EXCHANGE_NAME="boot_topic_exchange";
        public  static final String QUEUE_NAME="boot_queue";
        //1.交换机
        @Bean("bootExchange")
        public Exchange bootExchange(){
            return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
        }
        //2.queue队列
        @Bean("bootQueue")
        public Queue bootQueue(){
            return QueueBuilder.durable(QUEUE_NAME).build();
        }
        //3.交换机和队列绑定
        /*
            1.知道哪个队列
            2.知道哪个交换机
            3.routing key
         */
        @Bean
        public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
        }
    }
    

    5.发送消息


    image.png
    image.png

    7.2消费者

    image.png

    1.创建生产者SpringBoot工程
    2.引入依赖
    3.编写yml配置

    spring:
      rabbitmq:
        username: guest
        password: guest
        virtual-host: /
        addresses: 192.168.2.34:5672
    

    4.监听类


    image.png

    启动主启动类:


    image.png
    小结:
    image.png

    相关文章

      网友评论

          本文标题:RabbitMQ-消息中间件

          本文链接:https://www.haomeiwen.com/subject/nkyrnktx.html