美文网首页
RabbitMq工作模式

RabbitMq工作模式

作者: striveSmile | 来源:发表于2018-12-28 12:45 被阅读0次

    1、 工作模式

    RabbitMQ有以下几种工作模式 :
    1、Work queues
    2、Publish/Subscribe
    3、Routing
    4、Topics
    5、Header
    6、RPC

    1.1 Work queues(工作队列)

    image.png

    work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。
    应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
    测试:
    1、使用入门程序,启动多个消费者。
    2、生产者发送多个消息。

    public class Producer01 {
    
        private static final String QUEUE = "helloworld";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = null;
            Channel channel = null;
            try {
    
            //创建连接工程
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("localhost");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务,一个mq有多个虚拟机
    
            //创建与RabbitMQ服务的TCP连接
             connection = connectionFactory.newConnection();
            //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
             channel = connection.createChannel();
            /**
             * 声明队列,如果Rabbit中没有此队列将自动创建
             * QUEUE:队列名称
             * durable:是否持久化
             * exclusive:队列是否独占此链接
             * autoDelete:队列不再使用时是否自动删除此
             * arguments:队列参数
             */
            channel.queueDeclare(QUEUE, true, false, false, null);
    
            String message = "helloworld小明"+System.currentTimeMillis();
    
            /**
             * 消息发布方法
             * exchange: Exchange的名称,如果没有指定,则使用Default Exchange
             * routingKey:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
             * props: 消息包含的属性
             * body: 消息体
             * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
             * 默认的交换机,routingKey等于队列名称
             */
            channel.basicPublish("", QUEUE, null, message.getBytes());
            System.out.println("Send Message is:'" + message + "'");
    
            }catch (Exception e){
                e.printStackTrace();
            }finally {
    
                if(channel != null)
                {
                    channel.close();
                }
                if(connection != null)
                {
                    connection.close();
                }
            }
    
        }
    
    }
    

    消费者

    public class Consumer01 {
    
        private static final String QUEUE = "helloworld";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            Connection connection = null;
            Channel channel = null;
    
            //创建连接工程
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("localhost");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务,一个mq有多个虚拟机
    
            //创建与RabbitMQ服务的TCP连接
            connection = connectionFactory.newConnection();
            //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
            channel = connection.createChannel();
            /**
             * 声明队列,如果Rabbit中没有此队列将自动创建
             * QUEUE:队列名称
             * durable:是否持久化
             * exclusive:队列是否独占此链接
             * autoDelete:队列不再使用时是否自动删除此
             * arguments:队列参数
             */
            channel.queueDeclare(QUEUE, true, false, false, null);
    
            /**
             * 定义消费方法
             */
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                /**
                 *
                 * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
                 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
                 * @param properties  properties
                 * @param body
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    //交换机
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    //消息id
                    long deliveryTag = envelope.getDeliveryTag();
                    //消息内容
                    String msg = new String(body,"UTF-8");
                    System.out.println("receive message.." + msg);
    
    
                }
            };
    
            /**
             *监听队列String queue, boolean autoAck, Consumer callback
             * queue 队列名称
             * autoAck 是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复
             * callback 消费消息的方法,消费者接收到消息后调用此方法
             */
            channel.basicConsume(QUEUE,true,defaultConsumer);
        }
    
    }
    

    结果:
    1、一条消息只会被一个消费者接收;
    2、rabbit采用轮询的方式将消息是平均发送给消费者的;
    3、消费者在处理完某条消息后,才会收到下一条消息。

    1.2 Publish/subscribe(发布订阅)

    1.2.1 工作模式
    image.png

    案例:
    用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法

    1、生产者

    声明Exchange_fanout_inform交换机。
    声明两个队列并且绑定到此交换机,绑定时不需要指定routingkey
    发送消息时不需要指定routingkey

    public class Producer02_publish {
        //队列名称
        private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
        private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
        private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
    
        public static void main(String[] args) {
            //通过连接工厂创建新的连接和mq建立连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);//端口
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
    
            Connection connection = null;
            Channel channel = null;
            try {
                //建立新连接
                connection = connectionFactory.newConnection();
                //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
                channel = connection.createChannel();
                //声明队列,如果队列在mq 中没有则要创建
                //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                /**
                 * 参数明细
                 * 1、queue 队列名称
                 * 2、durable 是否持久化,如果持久化,mq重启后队列还在
                 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
                 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
                 * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
                 */
                channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
                channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
                //声明一个交换机
                //参数:String exchange, String type
                /**
                 * 参数明细:
                 * 1、交换机的名称
                 * 2、交换机的类型
                 * fanout:对应的rabbitmq的工作模式是 publish/subscribe
                 * direct:对应的Routing    工作模式
                 * topic:对应的Topics工作模式
                 * headers: 对应的headers工作模式
                 */
                channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
                //进行交换机和队列绑定
                //参数:String queue, String exchange, String routingKey
                /**
                 * 参数明细:
                 * 1、queue 队列名称
                 * 2、exchange 交换机名称
                 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串
                 */
                channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
                channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
                //发送消息
                //参数:String exchange, String routingKey, BasicProperties props, byte[] body
                /**
                 * 参数明细:
                 * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
                 * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
                 * 3、props,消息的属性
                 * 4、body,消息内容
                 */
                for(int i=0;i<5;i++){
                    //消息内容
                    String message = "send inform message to user";
                    channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes());
                    System.out.println("send to mq "+message);
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //关闭连接
                //先关闭通道
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
    
        }
    }
    
    
    2、邮件发送消费者
    public class Consumer02_subscribe_sms {
    
        //队列名称
        private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
        private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
        private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
        public static void main(String[] args) throws IOException, TimeoutException {
    
            Connection connection = null;
            Channel channel = null;
    
            //创建连接工程
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("localhost");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务,一个mq有多个虚拟机
    
            //创建与RabbitMQ服务的TCP连接
            connection = connectionFactory.newConnection();
            //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
            channel = connection.createChannel();
    
            /**
             * String exchange, String type
             * exchange:交换机名字
             * type 交换机类型
             */
    
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
    
            /**
             * 声明队列,如果Rabbit中没有此队列将自动创建
             * QUEUE:队列名称
             * durable:是否持久化
             * exclusive:队列是否独占此链接
             * autoDelete:队列不再使用时是否自动删除此
             * arguments:队列参数
             */
            channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
    
            /**
             * 交换机和队列绑定
             * String queue, String exchange, String routingKey
             * queue 队列名称
             * exchange 交换机名称
             * routingKey 路由key
             */
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
    
            /**
             * 定义消费方法
             */
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                /**
                 *
                 * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
                 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
                 * @param properties  properties
                 * @param body
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交换机
                    String exchange = envelope.getExchange();
                    //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
                    long deliveryTag = envelope.getDeliveryTag();
                    //消息内容
                    String message= new String(body,"utf-8");
                    System.out.println("receive message:"+message);
    
                }
            };
    
            /**
             *监听队列String queue, boolean autoAck, Consumer callback
             * queue 队列名称
             * autoAck 是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复
             * callback 消费消息的方法,消费者接收到消息后调用此方法
             */
            channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
        }
    
    }
    

    按照上边的代码,编写邮件通知的消费代码。
    3、短信发送消费者
    参考上边的邮件发送消费者代码编写.

    1.2.4 思考

    1、publish/subscribe与work queues有什么区别。

    区别:
    1)work queues不用定义交换机,而publish/subscribe需要定义交换机。

    2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认交换机)。

    3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实质上work queues会将队列绑定到默认的交换机 。

    相同点:

    1、所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。

    2、实质工作用什么 publish/subscribe还是work queues。
    建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大,并且发布订阅模式可以指定自己专用的交换机。

    1.3 Routing()

    1.3.1 工作模式
    image.png

    路由模式:
    1、每个消费者监听自己的队列,并且设置routingkey。
    2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列

    1.3.2代码
    1、生产者

    声明exchange_routing_inform交换机。
    声明两个队列并且绑定到此交换机,绑定时需要指定routingkey
    发送消息时需要指定routingkey

    public class Producer03_routing {
    
        //队列名称
        private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
        private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
        private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
        private static final String ROUTINGKEY_EMAIL="inform_email";
        private static final String ROUTINGKEY_SMS="inform_sms";
    
        public static void main(String[] args) {
            //通过连接工厂创建新的连接和mq建立连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);//端口
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
    
            Connection connection = null;
            Channel channel = null;
            try {
                //建立新连接
                connection = connectionFactory.newConnection();
                //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
                channel = connection.createChannel();
                //声明队列,如果队列在mq 中没有则要创建
                //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                /**
                 * 参数明细
                 * 1、queue 队列名称
                 * 2、durable 是否持久化,如果持久化,mq重启后队列还在
                 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
                 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
                 * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
                 */
                channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
                channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
                //声明一个交换机
                //参数:String exchange, String type
                /**
                 * 参数明细:
                 * 1、交换机的名称
                 * 2、交换机的类型
                 * fanout:对应的rabbitmq的工作模式是 publish/subscribe
                 * direct:对应的Routing    工作模式
                 * topic:对应的Topics工作模式
                 * headers: 对应的headers工作模式
                 */
                channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
                //进行交换机和队列绑定
                //参数:String queue, String exchange, String routingKey
                /**
                 * 参数明细:
                 * 1、queue 队列名称
                 * 2、exchange 交换机名称
                 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串
                 */
                channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);
                channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);
                //发送消息
                //参数:String exchange, String routingKey, BasicProperties props, byte[] body
                /**
                 * 参数明细:
                 * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
                 * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
                 * 3、props,消息的属性
                 * 4、body,消息内容
                 */
                  for(int i=0;i<5;i++){
                    //发送消息的时候指定routingKey
                    String message = "send email inform message to user";
                    channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL,null,message.getBytes());
                    System.out.println("send to mq "+message);
                }
                for(int i=0;i<5;i++){
                    //发送消息的时候指定routingKey
                    String message = "send email inform message to user";
                    channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS,null,message.getBytes());
                    System.out.println("send to mq "+message);
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //关闭连接
                //先关闭通道
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    }
    

    2、邮件发送消费者

    public class Consumer03_routing_email {
    
        //队列名称
        private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
        private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
        private static final String ROUTINGKEY_EMAIL="inform_email";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            Connection connection = null;
            Channel channel = null;
    
            //创建连接工程
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("localhost");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务,一个mq有多个虚拟机
    
            //创建与RabbitMQ服务的TCP连接
            connection = connectionFactory.newConnection();
            //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
            channel = connection.createChannel();
    
            /**
             * String exchange, String type
             * exchange:交换机名字
             * type 交换机类型
             */
    
            channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
    
            /**
             * 声明队列,如果Rabbit中没有此队列将自动创建
             * QUEUE:队列名称
             * durable:是否持久化
             * exclusive:队列是否独占此链接
             * autoDelete:队列不再使用时是否自动删除此
             * arguments:队列参数
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
    
            /**
             * 交换机和队列绑定
             * String queue, String exchange, String routingKey
             * queue 队列名称
             * exchange 交换机名称
             * routingKey 路由key
             */
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);
    
            /**
             * 定义消费方法
             */
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                /**
                 *
                 * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
                 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
                 * @param properties  properties
                 * @param body
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    //交换机
                    String exchange = envelope.getExchange();
                    //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
                    long deliveryTag = envelope.getDeliveryTag();
                    //消息内容
                    String message= new String(body,"utf-8");
                    System.out.println("receive message:"+message);
                }
            };
    
            /**
             *监听队列String queue, boolean autoAck, Consumer callback
             * queue 队列名称
             * autoAck 是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复
             * callback 消费消息的方法,消费者接收到消息后调用此方法
             */
            channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
        }
    
    }
    

    3、短信发送消费者
    参考邮件发送消费者的代码流程,编写短信通知的代码

    1.4.4思考

    1、Routing模式和Publish/subscibe有啥区别?
    Routing模式要求队列在绑定交换机时要指定routingkey,消息会转发到符合routingkey的队列。

    1.4 Topics(通配符模式)

    1.4.1工作模式

    image.png

    路由模式:
    1、每个消费者监听自己的队列,并且设置带统配符的routingkey。
    2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。

    1.4.2代码

    案例:
    根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效

    1、生产者

    声明交换机,指定topic类型:

    1、每个消费者监听自己的队列,并且设置带统配符的routingkey。
    2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列

    1.4.2代码
    案例:
    根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。

    1、生产者
    声明交换机,指定topic类型:

    public class Producer04_topics {
    
        //队列名称
        private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
        private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
        private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
        private static final String ROUTINGKEY_EMAIL="inform.#.email.#";
        private static final String ROUTINGKEY_SMS="inform.#.sms.#";
    
        public static void main(String[] args) {
            //通过连接工厂创建新的连接和mq建立连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);//端口
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
    
            Connection connection = null;
            Channel channel = null;
            try {
                //建立新连接
                connection = connectionFactory.newConnection();
                //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
                channel = connection.createChannel();
                //声明队列,如果队列在mq 中没有则要创建
                //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                /**
                 * 参数明细
                 * 1、queue 队列名称
                 * 2、durable 是否持久化,如果持久化,mq重启后队列还在
                 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
                 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
                 * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
                 */
                channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
                channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
                //声明一个交换机
                //参数:String exchange, String type
                /**
                 * 参数明细:
                 * 1、交换机的名称
                 * 2、交换机的类型
                 * fanout:对应的rabbitmq的工作模式是 publish/subscribe
                 * direct:对应的Routing    工作模式
                 * topic:对应的Topics工作模式
                 * headers: 对应的headers工作模式
                 */
                channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
                //进行交换机和队列绑定
                //参数:String queue, String exchange, String routingKey
                /**
                 * 参数明细:
                 * 1、queue 队列名称
                 * 2、exchange 交换机名称
                 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串
                 */
                channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);
                channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);
                //发送消息
                //参数:String exchange, String routingKey, BasicProperties props, byte[] body
                /**
                 * 参数明细:
                 * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
                 * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
                 * 3、props,消息的属性
                 * 4、body,消息内容
                 */
                for(int i=0;i<5;i++){
                    //发送消息的时候指定routingKey
                    String message = "send email inform message to user";
                    channel.basicPublish(EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL,null,message.getBytes());
                    System.out.println("send to mq "+message);
                }
              /*  for(int i=0;i<5;i++){
                    //发送消息的时候指定routingKey
                    String message = "send email inform message to user";
                    channel.basicPublish(EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS,null,message.getBytes());
                    System.out.println("send to mq "+message);
                }*/
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //关闭连接
                //先关闭通道
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    }
    
    2、消费端

    队列绑定交换机指定通配符:
    统配符规则:
    中间以“.”分隔。
    符号#可以匹配多个词,符号*可以匹配一个词语。

    1.4.4思考

    1、本案例的需求使用Routing工作模式能否实现?
    使用Routing模式也可以实现本案例,共设置三个 routingkey,分别是email、sms、all,email队列绑定email和all,sms队列绑定sms和all,这样就可以实现上边案例的功能,实现过程比topics复杂。
    Topic模式更多加强大,它可以实现Routing、publish/subscirbe模式的功能。

    相关文章

      网友评论

          本文标题:RabbitMq工作模式

          本文链接:https://www.haomeiwen.com/subject/rccrlqtx.html