美文网首页
RabbitMQ-消息中间件

RabbitMQ-消息中间件

作者: 爪洼程序员一枚 | 来源:发表于2020-12-21 13:50 被阅读0次

一、MQ基本概念

本笔记涉及的代码:百度网盘链接
链接:https://pan.baidu.com/s/1kros8iRqhC6YhzNvRIV7vg
提取码:3ixc
环境:jdk1.8 maven-3.5.2

1.概念

image.png
image.png

2.MQ的优势和劣势:

image.png

·系统的耦合度越高,容错性就越低,可维护性就越低,使用MQ使得应用间解耦,提升容错性和可维护性。
·异步提速:提升用户体验和系统吞吐量(单位时间内处理请求的数目)
·削峰填谷: 提高系统的稳定性


image.png

3.常见的MQ产品

image.png

4.RabbitMQ简介

image.png
image.png
image.png
image.png
image.png

4.JMS简介

image.png

二、RabbitMQ安装和配置

官网 https://www.rabbitmq.com/
1.点击Get Started

image.png
2.下载安装和文档
image.png
3.下载安装RabbitMQ
RabbitMQ默认安装位置: /usr/share/doc/rabbitmq-server-3.6.5/
(1)注意:安装RabbitMQ选择版本时,RabbitMQ版本要和安装的Erlang版本一致,不然会安装出错,访问下面网址,查看对应版本信息。
https://www.rabbitmq.com/which-erlang.html
image.png

(2)安装依赖环境
在线安装依赖环境:

yum install -y build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc x

(3)下载安装包
下载erlang:

wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm

下载socat:

wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm

下载rabbitmq-server:

wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
image.png

(4)安装
安装erlang

rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

安装socat加解密软件

rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm

安装rabbitmq

rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm 
image.png

(5)开启管理界面及配置

开启管理界面及配置

rabbitmq-plugins enable rabbitmq_management

修改默认配置信息

cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin
vim rabbit.app

比如修改密码、配置等等,例如:loopback_users中的<<"guest">>,只保留guest。
将guest用户打开才能进入rabbitmq控制台


image.png

(6)启动
service rabbitmq-server start #启动服务
service rabbitmq-server stop #停止服务
service rabbitmq-server restart #重启服务
(7)rabbitmq控制台启动登录
第一步:service rabbitmq-server start #启动服务
第二步:在浏览器中访问 linux系统ip:15672 会出现下面的页面 用户名和密码都是 guest


image.png
注意:如果访问失败:执行 iptables -F 命令关闭防火墙,在重新进行访问。如果你已经启动了rabbitmq后,修改了rabbit.app文件就需要重新启动rabbitmq。
image.png
出现这个界面就意味着rabbitmq的管理控制台和server安装成功了。

三、RabbitMQ管理平台使用

节点配置文件配置:


image.png
image.png
image.png

1.添加一个用户

image.png
image.png

2.添加一个虚拟机

image.png

添加成功后出现:


image.png
image.png
image.png

设置这些后,可以查看到我们创建的hhb用户可以访问/hhb虚拟机


image.png
image.png

四、RabbitMQ入门程序

需求:使用简单模式完成消息传递
步骤:
(1)创建工程(生产者、消费者)
(2)分别添加依赖
(3)编写生产者发送消息

1.新建两个maven工程rabbit_producer (生产者)、rabbit_consumer(消费者)

在pom.xml中都添加下面依赖

<dependencies>
        <!-- rabbitmq java客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

2.编写生产者java代码

image.png
image.png
public class ProducerHelloWorld {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.设置参数
        factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
        factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
        factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
        factory.setUsername("hhb");             //用户名;默认是 guest
        factory.setPassword("hhb");             //密码;默认是 guest
        //3.创建连接 Connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        //5.创建队列Queue
        /*
         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         参数:
            1.queue:队列名称
            2.durable:是否持久化,如果持久化,会保存到erlang自带的一个数据库中。当mq重启之后,还在。
            3.exclusive:一般设置为false
                *是否独占;只能有一个消费者监听这队列
                *当Connection关闭时,是否删除队列
            4.autoDelete:是否自动删除。当没有consumer时,自动删除掉。
            5.arguments:删除队列信息的一些参数信息
         */
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("hello_world",true,false,false,null);
        //6.发送消息
        /*
            basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                * 1.交换机名称;简单模式下交换机会使用默认的"",如果使用默认的"",路由名称routingKey要和队列名称一样
                * 2.routingKey:路由名称
                * 3.props:配置信息
                * 4.body:真实发送的消息数据
         */
        String body="hello rabbitmq O_O";
        channel.basicPublish("","hello_world",null,body.getBytes());

        //7.释放资源
        channel.close();
        connection.close();
    }
}

当我们执行代码后:


image.png
image.png

3.编写消费者java代码

public class ConsumerHelloWorld {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.设置参数
        factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
        factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
        factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
        factory.setUsername("hhb");             //用户名;默认是 guest
        factory.setPassword("hhb");             //密码;默认是 guest
        //3.创建连接 Connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        //5.创建队列Queue
        /*
         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         参数:
            1.queue:队列名称
            2.durable:是否持久化,如果持久化,会保存到erlang自带的一个数据库中。当mq重启之后,还在。
            3.exclusive:一般设置为false
                *是否独占;只能有一个消费者监听这队列
                *当Connection关闭时,是否删除队列
            4.autoDelete:是否自动删除。当没有consumer时,自动删除掉。
            5.arguments:删除队列信息的一些参数信息
         */
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("hello_world",true,false,false,null);
        //6.接收消息
        /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                * 1.queue: 队列名称
                * 2.autoAck:是否自动确认
                * 3.callback:回调对象
        */
        Consumer consumer=new DefaultConsumer(channel){
            /*
                 回调方法,当收到消息后,会自动执行该方法。
                 1.consumerTag:标识
                 2.envelope:获取一些信息,交换机,路由key
                 3.properties:配置信息
                 4.body:真实数据
            */

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag: "+consumerTag);
                System.out.println("envelope: "+envelope.getExchange());
                System.out.println("properties: "+properties);
                System.out.println("body: "+ new String(body));
            }
        };
        channel.basicConsume("hello_world",true,consumer);

        //7.不要关闭资源

    }
}

消费前:


image.png

消费后:


image.png
image.png

小结:


image.png

五、RabbitMQ工作模式

上面已经讲过简单模式。(一条消息只能被一个消费者消费)

5.1Work queues 工作队列模式 (一条消息只能被一个消费者消费)

1.模式说明


image.png

2.生产者代码


image.png
/*
 *发送消息
 */
public class ProducerWorkQueues {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.设置参数
        factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
        factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
        factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
        factory.setUsername("hhb");             //用户名;默认是 guest
        factory.setPassword("hhb");             //密码;默认是 guest
        //3.创建连接 Connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        //5.创建队列Queue
        /*
         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         参数:
            1.queue:队列名称
            2.durable:是否持久化,如果持久化,会保存到erlang自带的一个数据库中。当mq重启之后,还在。
            3.exclusive:一般设置为false
                *是否独占;只能有一个消费者监听这队列
                *当Connection关闭时,是否删除队列
            4.autoDelete:是否自动删除。当没有consumer时,自动删除掉。
            5.arguments:删除队列信息的一些参数信息
         */
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("work_queues",true,false,false,null);
        //6.发送消息
        /*
            basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                * 1.交换机名称;简单模式下交换机会使用默认的"",如果使用默认的"",路由名称routingKey要和队列名称一样
                * 2.routingKey:路由名称
                * 3.props:配置信息
                * 4.body:真实发送的消息数据
         */
        // 我们模拟一次发送多条消息。
        for (int i = 0; i < 10; i++) {
            String body=i+" hello rabbitmq O_O";
            channel.basicPublish("","hello_world",null,body.getBytes());
        }

        //7.释放资源
        channel.close();
        connection.close();
    }
}

3.消费者代码


image.png
public class ConsumerWorkQueues {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.设置参数
        factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
        factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
        factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
        factory.setUsername("hhb");             //用户名;默认是 guest
        factory.setPassword("hhb");             //密码;默认是 guest
        //3.创建连接 Connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        //5.创建队列Queue
        /*
         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         参数:
            1.queue:队列名称
            2.durable:是否持久化,如果持久化,会保存到erlang自带的一个数据库中。当mq重启之后,还在。
            3.exclusive:一般设置为false
                *是否独占;只能有一个消费者监听这队列
                *当Connection关闭时,是否删除队列
            4.autoDelete:是否自动删除。当没有consumer时,自动删除掉。
            5.arguments:删除队列信息的一些参数信息
         */
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("work_queues",true,false,false,null);
        //6.接收消息
        /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                * 1.queue: 队列名称
                * 2.autoAck:是否自动确认
                * 3.callback:回调对象
        */
        Consumer consumer=new DefaultConsumer(channel){
            /*
                 回调方法,当收到消息后,会自动执行该方法。
                 1.consumerTag:标识
                 2.envelope:获取一些信息,交换机,路由key
                 3.properties:配置信息
                 4.body:真实数据
            */

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag: "+consumerTag);
//                System.out.println("envelope: "+envelope.getExchange());
//                System.out.println("properties: "+properties);
                System.out.println("body: "+ new String(body));
            }
        };
        channel.basicConsume("work_queues",true,consumer);

        //7.不要关闭资源

    }
}

两个java(ConsumerWorkQueues和ConsumerWorkQueues2)文件内容代码是一样的。
4.测试
第一步:启动两个消费者


image.png

启动后可以看到如下,队列work_queues:


image.png
第二步:启动生产者
image.png
生产者启动后,两个消费者立即消费,它们依次轮流消费一个消息:
image.png
image.png

小结:


image.png

5.2Pub/Sub 订阅模式模式 (从这里开始后面的一条消息可以被多个消费者消费)

1.模式说明(这里开始加入了交换机,交换机类型为BuiltinExchangeType.FANOUT)


image.png

生产者发送消息后通过交换机,交换机会把消息发给每一个与该交换机绑定的队列。
2.需求
生产者生产一条日志消息,一个消费者把日志信息打印出来,另一个消费者把日志信息存在数据库。
3.生产者代码


image.png
public class ProducerPubSub {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.设置参数
        factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
        factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
        factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
        factory.setUsername("hhb");             //用户名;默认是 guest
        factory.setPassword("hhb");             //密码;默认是 guest
        //3.创建连接 Connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
       //5.创建交换机
        /*
            exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
            参数:
                * 1.exchange:交换机名称
                * 2.type:交换机类型 ,枚举
                    DIRECT("direct"), 定向
                    FANOUT("fanout"), 扇形(广播),发送消息给每一个与子绑定队列
                    TOPIC("topic"),   通配符
                    HEADERS("headers");参数匹配
                * 3.durable:是否持久化
                * 4.autoDelete:自动删除
                * 5.internal:内部使用。一般false
                * 6.arguments:参数
         */
        String exchangeName = "test_fanout";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
        //6.创建队列
        String queue1Name="test_fanout_queue1";
        String queue2Name="test_fanout_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7.绑定队列和交换机
        /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                * 1.queue:队列名称
                * 2.exchange:交换机名称
                * 3.routingKey:路由键,绑定规则
                    如果交换机的类型为fanout,routingKey设置为"",意味着交换机会把消息分发给每一个与之绑定的队列
         */
        channel.queueBind(queue1Name,exchangeName,"");
        channel.queueBind(queue2Name,exchangeName,"");
        //8.发送消息
        String body="日志信息: 张三调用了findAll方法------日志级别:info-----";
        channel.basicPublish(exchangeName,"",null,body.getBytes());
        //9.释放资源
        channel.close();
        connection.close();
    }
}
image.png
image.png

4.消费者代码


image.png
public class ConsumerPubSub1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.设置参数
        factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
        factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
        factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
        factory.setUsername("hhb");             //用户名;默认是 guest
        factory.setPassword("hhb");             //密码;默认是 guest
        //3.创建连接 Connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        //5.接收消息
        /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                * 1.queue: 队列名称
                * 2.autoAck:是否自动确认
                * 3.callback:回调对象
        */
        Consumer consumer=new DefaultConsumer(channel){
            /*
                 回调方法,当收到消息后,会自动执行该方法。
                 1.consumerTag:标识
                 2.envelope:获取一些信息,交换机,路由key
                 3.properties:配置信息
                 4.body:真实数据
            */

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag: "+consumerTag);
//                System.out.println("envelope: "+envelope.getExchange());
//                System.out.println("properties: "+properties);
                System.out.println("body: "+ new String(body));
                System.out.println("把日志打印到控制台");
            }
        };
        String queue1Name="test_fanout_queue1";
        channel.basicConsume(queue1Name,true,consumer);

        //6.不要关闭资源

    }
}

ConsumerPubSub1 和ConsumerPubSub2代码基本一致,区别在于对于日志信息打印不同。


image.png

5.3Rounting路由模式

1.模式说明(交换机根据不同的路由key选择不同的队列,交换机类型为BuiltinExchangeType.DIRECT)


image.png

2.生产者代码


image.png
public class ProducerRounting {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.设置参数
        factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
        factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
        factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
        factory.setUsername("hhb");             //用户名;默认是 guest
        factory.setPassword("hhb");             //密码;默认是 guest
        //3.创建连接 Connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
       //5.创建交换机
        /*
            exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
            参数:
                * 1.exchange:交换机名称
                * 2.type:交换机类型 ,枚举
                    DIRECT("direct"), 定向
                    FANOUT("fanout"), 扇形(广播),发送消息给每一个与子绑定队列
                    TOPIC("topic"),   通配符
                    HEADERS("headers");参数匹配
                * 3.durable:是否持久化
                * 4.autoDelete:自动删除
                * 5.internal:内部使用。一般false
                * 6.arguments:参数
         */
        String exchangeName = "test_direct";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
        //6.创建队列
        String queue1Name="test_direct_queue1";
        String queue2Name="test_direct_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7.绑定队列和交换机
        /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                * 1.queue:队列名称
                * 2.exchange:交换机名称
                * 3.routingKey:路由键,绑定规则
                    如果交换机的类型为fanout,routingKey设置为"",意味着交换机会把消息分发给每一个与之绑定的队列
         */
        //队列1的绑定
        channel.queueBind(queue1Name,exchangeName,"error");
        //队列2的绑定
        channel.queueBind(queue2Name,exchangeName,"info");
        channel.queueBind(queue2Name,exchangeName,"error");
        channel.queueBind(queue2Name,exchangeName,"warning");
        //8.发送消息
        String body="日志信息: 张三调用了findAll方法------日志级别:info-----";
        channel.basicPublish(exchangeName,"info",null,body.getBytes());
        //9.释放资源
        channel.close();
        connection.close();
    }
}

运行后:


image.png
image.png

3.消费者代码


image.png
//这是ConsumerRounting1代码
public class ConsumerRounting1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.设置参数
        factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
        factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
        factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
        factory.setUsername("hhb");             //用户名;默认是 guest
        factory.setPassword("hhb");             //密码;默认是 guest
        //3.创建连接 Connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        //5.接收消息
        /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                * 1.queue: 队列名称
                * 2.autoAck:是否自动确认
                * 3.callback:回调对象
        */
        Consumer consumer=new DefaultConsumer(channel){
            /*
                 回调方法,当收到消息后,会自动执行该方法。
                 1.consumerTag:标识
                 2.envelope:获取一些信息,交换机,路由key
                 3.properties:配置信息
                 4.body:真实数据
            */

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag: "+consumerTag);
//                System.out.println("envelope: "+envelope.getExchange());
//                System.out.println("properties: "+properties);
                System.out.println("body: "+ new String(body));
            }
        };
        String queue1Name="test_direct_queue1";
        channel.basicConsume(queue1Name,true,consumer);

        //6.不要关闭资源

    }
}

当我们执行ConsumerRounting2代码时,打印了日志信息。ConsumerRounting2和ConsumerRounting1代码基本一样,只是监听的队列名不一样;


image.png
image.png

小结:


image.png

5.4Topics通配符模式

1.模式 (交换机类型类型为BuiltinExchangeType.TOPIC)


image.png

2.生产者代码


image.png
public class ProducerTopics {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.设置参数
        factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
        factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
        factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
        factory.setUsername("hhb");             //用户名;默认是 guest
        factory.setPassword("hhb");             //密码;默认是 guest
        //3.创建连接 Connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
       //5.创建交换机
        /*
            exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
            参数:
                * 1.exchange:交换机名称
                * 2.type:交换机类型 ,枚举
                    DIRECT("direct"), 定向
                    FANOUT("fanout"), 扇形(广播),发送消息给每一个与子绑定队列
                    TOPIC("topic"),   通配符
                    HEADERS("headers");参数匹配
                * 3.durable:是否持久化
                * 4.autoDelete:自动删除
                * 5.internal:内部使用。一般false
                * 6.arguments:参数
         */
        String exchangeName = "test_Topics";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
        //6.创建队列
        String queue1Name="test_Topics_queue1";
        String queue2Name="test_Topics_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7.绑定队列和交换机
        /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                * 1.queue:队列名称
                * 2.exchange:交换机名称
                * 3.routingKey:路由键,绑定规则
                    如果交换机的类型为fanout,routingKey设置为"",意味着交换机会把消息分发给每一个与之绑定的队列
         */
        //rounting key 假如由   系统的名称.日志的级别  这两部分组成
        //需求:
        //  队列1:所有error级别的日志存入数据库,所有order系统的日志存入数据库。
        //  队列2:所有日志信息打印到控制台
        //队列1的绑定
        channel.queueBind(queue1Name,exchangeName,"#.error");
        channel.queueBind(queue1Name,exchangeName,"order.*");
        //队列2的绑定
        channel.queueBind(queue2Name,exchangeName,"*.*");
        //8.发送消息
        String body="日志信息: 张三调用了findAll方法------日志级别:info-----";
        channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
        //9.释放资源
        channel.close();
        connection.close();
    }
}

生产者代码执行后:


image.png
image.png

3.消费者代码


image.png
image.png
代码:
public class ConsumerTopic1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //2.设置参数
        factory.setHost("192.168.2.34"); //rabbitmq server 所在的宿主机的ip地址  ;默认是localhost
        factory.setPort(5672); //rabbitmq server 服务器的对外暴露的端口号;默认是5672
        factory.setVirtualHost("/hhb");         //虚拟机;默认值是 /
        factory.setUsername("hhb");             //用户名;默认是 guest
        factory.setPassword("hhb");             //密码;默认是 guest
        //3.创建连接 Connection
        Connection connection = factory.newConnection();
        //4.创建channel
        Channel channel = connection.createChannel();
        //5.接收消息
        /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                * 1.queue: 队列名称
                * 2.autoAck:是否自动确认
                * 3.callback:回调对象
        */
        Consumer consumer=new DefaultConsumer(channel){
            /*
                 回调方法,当收到消息后,会自动执行该方法。
                 1.consumerTag:标识
                 2.envelope:获取一些信息,交换机,路由key
                 3.properties:配置信息
                 4.body:真实数据
            */

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag: "+consumerTag);
//                System.out.println("envelope: "+envelope.getExchange());
//                System.out.println("properties: "+properties);
                System.out.println("body: "+ new String(body));
                System.out.println("error级别的日志,所有order系统的日志已存入数据库。");
            }
        };
        String queue1Name="test_Topics_queue1";
        channel.basicConsume(queue1Name,true,consumer);

        //6.不要关闭资源

    }
}

ConsumerTopic1和ConsumerTopic2代码差不多。
小结:


image.png

5.5 工作模式总结

image.png

六、Spring整合RabbitMQ

6.1 生产者

(1)创建生产者工程naven工程(spring-rabbitmq-producer)
(2)添加依赖

<dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.2.2.RELEASE</version>
        </dependency>
        <!-- spring整合rabbitmq的依赖包-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.2.2.RELEASE</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

(3)配置整合
1.rabbitmq.properties

rabbitmq.host=192.168.2.34
rabbitmq.port=5672
rabbitmq.username=hhb
rabbitmq.password=hhb
rabbitmq.virtual-host=/hhb
  2.spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="rabbitmq.properties"/>

    <!--定义rabbitmq connectionFactory-->
    <rabbit:connection-factory
        id="connectionFactory"
        host="${rabbitmq.host}"
        port="${rabbitmq.port}"
        username="${rabbitmq.username}"
        password="${rabbitmq.password}"
        virtual-host="${rabbitmq.virtual-host}"/>

    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~简单方式~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
    <!--定义持久化队列,不存在则自动创建,不绑定到交换机则绑定到默认交换机
     默认交换机类型为direct,名字为:"",路由键为队列的名称-->
    <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>

    <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播:所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
    <!--定义广播交换机中的持久化队列,不存在则自动创建 -->
    <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>

    <!--定义广播交换机中的持久化队列,不存在则自动创建 -->
    <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>

    <!--定义广播类型交换机,并绑定上述两个队列 -->
    <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding queue="spring_fanout_queue_1"/>
            <rabbit:binding queue="spring_fanout_queue_2"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符:*匹配一个单词,#匹配多个单词~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
    <!--定义广播交换机中的持久化队列,不存在则自动创建 -->
    <rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>

    <!--定义广播交换机中的持久化队列,不存在则自动创建 -->
    <rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>

    <!--定义广播交换机中的持久化队列,不存在则自动创建 -->
    <rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>

    <!--定义广播类型交换机,并绑定上述两个队列 -->
    <rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding pattern="hhb.*" queue="spring_topic_queue_star"/>
            <rabbit:binding pattern="hhb.#" queue="spring_topic_queue_well"/>
            <rabbit:binding pattern="it.#" queue="spring_topic_queue_well2"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息 -->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>

(4)编写代码发送消息


image.png

代码:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
    //1.注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testHelloWorld(){
        //2.发送消息
        rabbitTemplate.convertAndSend("spring_queue","hello world spring");
    }
    /**
     *发送fanout消息
     */
    @Test
    public void testFanout(){
        //2.发送消息
        rabbitTemplate.convertAndSend("spring_fanout_exchange","","spring fanout...");
    }

    /**
     *发送topic消息
     */
    @Test
    public void testTopic(){
        //2.发送消息
        rabbitTemplate.convertAndSend("spring_topic_exchange","hhb.nice","spring topic...");
    }
}

执行代码以后:


image.png

6.2 消费者

(1)创建消费者工程 maven工程 (spring-rabbitmq-consumer)
(2)添加依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.2.2.RELEASE</version>
        </dependency>
        <!-- spring整合rabbitmq的依赖包-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.2.2.RELEASE</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

(3)配置文件
1.rabbitmq.properties 与生产者一样
2.spring-rabbitmq-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="rabbitmq.properties"/>

    <!--定义rabbitmq connectionFactory-->
    <rabbit:connection-factory
        id="connectionFactory"
        host="${rabbitmq.host}"
        port="${rabbitmq.port}"
        username="${rabbitmq.username}"
        password="${rabbitmq.password}"
        virtual-host="${rabbitmq.virtual-host}"/>
    <!--消费代码类(消息监听器)-->
    <bean id="springQueueListener" class="com.hhb.rabbitmq.listener.SpringQueueListener"/>
    <bean id="fanoutListener1" class="com.hhb.rabbitmq.listener.FanoutListener1"/>
    <bean id="fanoutListener2" class="com.hhb.rabbitmq.listener.FanoutListener2"/>
    <bean id="topicListenerStar" class="com.hhb.rabbitmq.listener.TopicListenerStar"/>
    <bean id="topicListenerWell" class="com.hhb.rabbitmq.listener.TopicListenerWell"/>
    <bean id="topicListenerWell2" class="com.hhb.rabbitmq.listener.TopicListenerWell2"/>

    <!--监听器容器 监听器监听哪个队列-->
    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
        <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
        <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>
        <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
        <rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>
        <rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>
        <rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>
    </rabbit:listener-container>
</beans>

(4)编写消息监听器


image.png

以其中SpringQueueListener为例(其他类都是一样的写法):


image.png
编写一个测试类,进行测试:
image.png

七、SpringBoot整合RabbitMQ

7.1生产者

步骤:


image.png

1.创建生产者SpringBoot工程
2.引入依赖
3.编写yml配置

spring:
  rabbitmq:
    username: guest
    password: guest
    virtual-host: /
    addresses: 192.168.2.34:5672

4.定义交换机、队列,绑定交换机队列


image.png
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    public  static final String EXCHANGE_NAME="boot_topic_exchange";
    public  static final String QUEUE_NAME="boot_queue";
    //1.交换机
    @Bean("bootExchange")
    public Exchange bootExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }
    //2.queue队列
    @Bean("bootQueue")
    public Queue bootQueue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }
    //3.交换机和队列绑定
    /*
        1.知道哪个队列
        2.知道哪个交换机
        3.routing key
     */
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }
}

5.发送消息


image.png
image.png

7.2消费者

image.png

1.创建生产者SpringBoot工程
2.引入依赖
3.编写yml配置

spring:
  rabbitmq:
    username: guest
    password: guest
    virtual-host: /
    addresses: 192.168.2.34:5672

4.监听类


image.png

启动主启动类:


image.png
小结:
image.png

相关文章

网友评论

      本文标题:RabbitMQ-消息中间件

      本文链接:https://www.haomeiwen.com/subject/nkyrnktx.html