美文网首页
消息队列之rabbitMq

消息队列之rabbitMq

作者: 黑客和白帽子的故事 | 来源:发表于2018-01-24 20:06 被阅读0次

    简介:

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ.开源产品 activeMQ,RabbitMQ 阿里RocketMQ(摘自百度)

    使用场景:

    在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
    举例:

    以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。

    RabbitMQ 特点:

    RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

    • 1.可靠性(Reliability)
      RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

    • 2.灵活的路由(Flexible Routing)
      在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

    • 3.消息集群(Clustering)
      多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

    • 4.高可用(Highly Available Queues)
      队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

    • 5.多种协议(Multi-protocol)
      RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

    • 6.多语言客户端(Many Clients)
      RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。

    • 7.管理界面(Management UI)
      RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

    • 8.跟踪机制(Tracing)
      如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

    • 9.插件机制(Plugin System)
      RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

    词语解释:

    image.png

    Brocker:消息队列服务器实体。

    Exchange:消息交换机,指定消息按什么规则,路由到哪个队列。

    Queue:消息队列,每个消息都会被投入到一个或者多个队列里。

    Binding:绑定,它的作用是把exchange和queue按照路由规则binding起来。

    Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

    Vhost:虚拟主机,一个broker里可以开设多个vhost,用作不用用户的权限分离。RabbitMQ 默认的 vhost 是 /

    Producer:消息生产者,就是投递消息的程序。

    Consumer:消息消费者,就是接受消息的程序。

    Channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

    RabbitMQ使用场景:

    学习RabbitMQ的使用场景,来自官方教程:https://www.rabbitmq.com/getstarted.html

    添加pom.xml

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    
    • 场景1:单发送单接收
      使用场景:简单的发送与接收,没有特别的处理。
    Queue

    生产者:

     public class Producer {
    
        private final static String QUEUE_NAME = "test";
    
        public static void main(String[] argv) throws Exception {
    
             /* 创建连接工厂 */
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
            /* 创建连接 */
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            // queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)
            // 第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 需发送的信息
            String message = "Hello World!";
            //第一个参数为交换机名称、
            // 第二个参数为队列映射的路由key、
            // 第三个参数为消息的其他属性、
            // 第四个参数为发送信息的主体
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }
    

    消费端 :

    public class Consumer {
    
        private final static String QUEUE_NAME = "test";
    
        public static void main(String[] argv) throws Exception {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
            /* 创建连接 */
            Connection connection = factory.newConnection();
             /* 创建信道 */
            Channel channel = connection.createChannel();
    
            // 声明一个队列:名称、持久性的(重启仍存在此队列)、非私有的、非自动删除的
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
    
            //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
            // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Customer Received '" + message + "'");
                }
            };
    
            // 将消费者绑定到队列,并设置自动确认消息(即无需显示确认,如何设置请慎重考虑)
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    
    

    结果:


    生产 者 消费者
    • 场景2:单发送多接收
      使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。
    Work queues

    生产端:

    public class Producer {
    
        private static final String TASK_QUEUE_NAME = "work_quene";
    
        public static void main(String[] argv) throws Exception {
    
            /* 创建连接工厂 */
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
    
               /* 创建连接 */
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            // 声明一个队列:名称、持久性的(重启仍存在此队列)、非私有的、非自动删除的
            channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
            //分发信息
            for (int i = 0; i < 10; i++) {
                String message = "Hello RabbitMQ" + i;
                channel.basicPublish("", TASK_QUEUE_NAME,
                        MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                System.out.println("NewTask send '" + message + "'");
            }
            channel.close();
            connection.close();
        }
    }
    

    消费者端:

    package com.rabbitmq.mq.workqueues;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Consumer {
    
        private static final String TASK_QUEUE_NAME = "work_quene";
    
        public static void main(String[] argv) throws Exception {
                    /* 创建连接工厂 */
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
    
               /* 创建连接 */
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            // 声明一个队列:名称、持久性的(重启仍存在此队列)、非私有的、非自动删除的
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    
            //  保证在接收端一个消息没有处理完时不会接收另一个消息,即接收端发送了ack后才会接收下一个消息。在这种情况下发送端会尝试把消息发送给下一个not busy的接收端
            channel.basicQos(1);
    
            final DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Worker1  Received '" + message + "'");
                    try {
                        doWork(message);
                    }catch (Exception e){
                        channel.abort();
                    }finally {
                        System.out.println("Worker1 Done");
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                }
            };
            //如果为true的话,每次生产者只要发送信息就会从内存中删除,那么如果消费者程序异常退出
            boolean autoAck=false;
    
            //消息消费完成确认
            channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
        }
    
        private static  void doWork(String task) {
            try {
                Thread.sleep(1000); // 暂停1秒钟
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    
    
    }
    

    ps:消费者1和消费者2代码相同。

    结果:

    生产者 消费者1 消费者2
    • 场景3:Publish/Subscribe

    使用场景:发布、订阅模式,发送端发送广播消息,多个接收端接收。

    发布/订阅

    生产者:

    package com.rabbitmq.mq.subscribe;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Producer {
    
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv) throws Exception {
    
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
    
           /* 创建连接 */
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            //发送消息到一个名为“logs”的exchange上,使用“fanout”方式发送,即广播消息 所有的消费者 得到同样的队列信息
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
    
            //分发信息
            for (int i=0;i<5;i++){
                String message="Hello World"+i;
                channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
                System.out.println("EmitLog Sent '" + message + "'");
            }
            channel.close();
            connection.close();
        }
    }
    

    消费者:

    public class Consumer {
    
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            //产生一个随机的队列名称
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, EXCHANGE_NAME, "");//对队列进行绑定
    
            System.out.println("ReceiveLogs1 Waiting for messages");
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("ReceiveLogs1 Received '" + message + "'");
                }
            };
            channel.basicConsume(queueName, true, consumer);//队列会自动删除
    
        }
    }
    

    结果:


    生产者 订阅者
    • 场景4:Routing (按路线发送接收)

    使用场景:发送端按routing key发送消息,不同的接收端按不同的routing key接收消息。

    路由

    生产者:

    public class RoutingProducer {
    
        private static final String EXCHANGE_NAME = "direct_logs";
    
        // 路由关键字
        private static final String[] routingKeys = new String[]{"info", "warning", "error"};
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");//注意是direct
    
            //发送信息
            for (String routingKey : routingKeys) {
                String message = "RoutingSendDirect Send the message level:" + routingKey;
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
                System.out.println("RoutingSendDirect Send" + routingKey + "':'" + message);
            }
            channel.close();
            connection.close();
    
        }
    }
    

    消费者1:

    /**
     * 消费者1
     */
    public class RoutingWorker1 {
        // 交换器名称
        private static final String EXCHANGE_NAME = "direct_logs";
        // 路由关键字
        private static final String[] routingKeys = new String[]{"info" ,"warning"};
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            //声明交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            //获取匿名队列名称
            String queueName=channel.queueDeclare().getQueue();
    
            //根据路由关键字进行绑定
            for (String routingKey:routingKeys){
                channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
                System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+"," +
                        " queue:"+queueName+", BindRoutingKey:" + routingKey);
            }
            System.out.println("ReceiveLogsDirect1  Waiting for messages");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("ReceiveLogsDirect1 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    
    

    消费者2:

    /**
     * 消费者2
     */
    public class RoutingWorker2 {
        // 交换器名称
        private static final String EXCHANGE_NAME = "direct_logs";
        // 路由关键字
        private static final String[] routingKeys = new String[]{"error"};
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            //声明交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            //获取匿名队列名称
            String queueName=channel.queueDeclare().getQueue();
    
            //根据路由关键字进行绑定
            for (String routingKey:routingKeys){
                channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
                System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+"," +
                        " queue:"+queueName+", BindRoutingKey:" + routingKey);
            }
            System.out.println("ReceiveLogsDirect1  Waiting for messages");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("ReceiveLogsDirect1 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    }
    

    结果:


    生产者 消费者1 消费者2
    • 场景5:Topics (按topic发送接收)

    使用场景:发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。(这种应该属于模糊匹配)

    topic

    生产者:

    public class TopicProducer {
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
    
            Connection connection=factory.newConnection();
            Channel channel=connection.createChannel();
    
            //声明一个匹配模式的交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"topic");
    
            //待发送的消息
            String[] routingKeys=new String[]{
                    "quick.orange.rabbit",
                    "lazy.orange.elephant",
                    "quick.orange.fox",
                    "lazy.brown.fox",
                    "quick.brown.fox",
                    "quick.orange.male.rabbit",
                    "lazy.orange.male.rabbit"
            };
    
            //发送消息
            for(String severity :routingKeys){
                String message = "From "+severity+" routingKey' s message!";
                channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
                System.out.println("TopicSend Sent '" + severity + "':'" + message + "'");
            }
    
            channel.close();
            connection.close();
            
        }
    }
    
    

    消费者1:

    
    public class TopicConsumer1 {
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            //声明一个匹配模式的交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            String queueName = channel.queueDeclare().getQueue();
    
            //路由关键字
            String[] routingKeys = new String[]{"*.orange.*"};
    
            //绑定路由
            for (String routingKey : routingKeys) {
                channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
                System.out.println("ReceiveLogsTopic1 exchange:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + routingKey);
            }
            System.out.println("ReceiveLogsTopic1 Waiting for messages");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("ReceiveLogsTopic1 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                }
            };
    
            channel.basicConsume(queueName, true, consumer);
    
        }
    }
    

    消费者2:

    public class TopicConsumer2 {
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] argv) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
    
            //声明一个匹配模式的交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            String queueName = channel.queueDeclare().getQueue();
    
            // 路由关键字
            String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"};
    //      绑定路由关键字
            for (String bindingKey : routingKeys) {
                channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
                System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey);
            }
    
            System.out.println("ReceiveLogsTopic2 Waiting for messages");
    
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
                    String message = new String(body, "UTF-8");
                    System.out.println("ReceiveLogsTopic2 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                }
            };
            channel.basicConsume(queueName, true, consumer);
    
        }
    
    }
    

    结果:


    生产者 消费者1 消费者2

    相关文章

      网友评论

          本文标题:消息队列之rabbitMq

          本文链接:https://www.haomeiwen.com/subject/wkvooxtx.html