美文网首页
消息队列之rabbitMq

消息队列之rabbitMq

作者: 黑客和白帽子的故事 | 来源:发表于2018-01-24 20:06 被阅读0次

简介:

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ.开源产品 activeMQ,RabbitMQ 阿里RocketMQ(摘自百度)

使用场景:

在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
举例:

以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。

RabbitMQ 特点:

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

  • 1.可靠性(Reliability)
    RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

  • 2.灵活的路由(Flexible Routing)
    在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

  • 3.消息集群(Clustering)
    多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

  • 4.高可用(Highly Available Queues)
    队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

  • 5.多种协议(Multi-protocol)
    RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

  • 6.多语言客户端(Many Clients)
    RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。

  • 7.管理界面(Management UI)
    RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

  • 8.跟踪机制(Tracing)
    如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

  • 9.插件机制(Plugin System)
    RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

词语解释:

image.png

Brocker:消息队列服务器实体。

Exchange:消息交换机,指定消息按什么规则,路由到哪个队列。

Queue:消息队列,每个消息都会被投入到一个或者多个队列里。

Binding:绑定,它的作用是把exchange和queue按照路由规则binding起来。

Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

Vhost:虚拟主机,一个broker里可以开设多个vhost,用作不用用户的权限分离。RabbitMQ 默认的 vhost 是 /

Producer:消息生产者,就是投递消息的程序。

Consumer:消息消费者,就是接受消息的程序。

Channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

RabbitMQ使用场景:

学习RabbitMQ的使用场景,来自官方教程:https://www.rabbitmq.com/getstarted.html

添加pom.xml

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  • 场景1:单发送单接收
    使用场景:简单的发送与接收,没有特别的处理。
Queue

生产者:

 public class Producer {

    private final static String QUEUE_NAME = "test";

    public static void main(String[] argv) throws Exception {

         /* 创建连接工厂 */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);
        /* 创建连接 */
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)
        // 第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 需发送的信息
        String message = "Hello World!";
        //第一个参数为交换机名称、
        // 第二个参数为队列映射的路由key、
        // 第三个参数为消息的其他属性、
        // 第四个参数为发送信息的主体
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

消费端 :

public class Consumer {

    private final static String QUEUE_NAME = "test";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);
        /* 创建连接 */
        Connection connection = factory.newConnection();
         /* 创建信道 */
        Channel channel = connection.createChannel();

        // 声明一个队列:名称、持久性的(重启仍存在此队列)、非私有的、非自动删除的
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);


        //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
        // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Customer Received '" + message + "'");
            }
        };

        // 将消费者绑定到队列,并设置自动确认消息(即无需显示确认,如何设置请慎重考虑)
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

结果:


生产 者 消费者
  • 场景2:单发送多接收
    使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。
Work queues

生产端:

public class Producer {

    private static final String TASK_QUEUE_NAME = "work_quene";

    public static void main(String[] argv) throws Exception {

        /* 创建连接工厂 */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);

           /* 创建连接 */
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明一个队列:名称、持久性的(重启仍存在此队列)、非私有的、非自动删除的
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        //分发信息
        for (int i = 0; i < 10; i++) {
            String message = "Hello RabbitMQ" + i;
            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println("NewTask send '" + message + "'");
        }
        channel.close();
        connection.close();
    }
}

消费者端:

package com.rabbitmq.mq.workqueues;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {

    private static final String TASK_QUEUE_NAME = "work_quene";

    public static void main(String[] argv) throws Exception {
                /* 创建连接工厂 */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);

           /* 创建连接 */
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明一个队列:名称、持久性的(重启仍存在此队列)、非私有的、非自动删除的
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        //  保证在接收端一个消息没有处理完时不会接收另一个消息,即接收端发送了ack后才会接收下一个消息。在这种情况下发送端会尝试把消息发送给下一个not busy的接收端
        channel.basicQos(1);

        final DefaultConsumer consumer = new DefaultConsumer(channel) {

            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Worker1  Received '" + message + "'");
                try {
                    doWork(message);
                }catch (Exception e){
                    channel.abort();
                }finally {
                    System.out.println("Worker1 Done");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        //如果为true的话,每次生产者只要发送信息就会从内存中删除,那么如果消费者程序异常退出
        boolean autoAck=false;

        //消息消费完成确认
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
    }

    private static  void doWork(String task) {
        try {
            Thread.sleep(1000); // 暂停1秒钟
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }


}

ps:消费者1和消费者2代码相同。

结果:

生产者 消费者1 消费者2
  • 场景3:Publish/Subscribe

使用场景:发布、订阅模式,发送端发送广播消息,多个接收端接收。

发布/订阅

生产者:

package com.rabbitmq.mq.subscribe;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {


        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);

       /* 创建连接 */
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //发送消息到一个名为“logs”的exchange上,使用“fanout”方式发送,即广播消息 所有的消费者 得到同样的队列信息
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        //分发信息
        for (int i=0;i<5;i++){
            String message="Hello World"+i;
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
            System.out.println("EmitLog Sent '" + message + "'");
        }
        channel.close();
        connection.close();
    }
}

消费者:

public class Consumer {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //产生一个随机的队列名称
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");//对队列进行绑定

        System.out.println("ReceiveLogs1 Waiting for messages");
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("ReceiveLogs1 Received '" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);//队列会自动删除

    }
}

结果:


生产者 订阅者
  • 场景4:Routing (按路线发送接收)

使用场景:发送端按routing key发送消息,不同的接收端按不同的routing key接收消息。

路由

生产者:

public class RoutingProducer {

    private static final String EXCHANGE_NAME = "direct_logs";

    // 路由关键字
    private static final String[] routingKeys = new String[]{"info", "warning", "error"};

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");//注意是direct

        //发送信息
        for (String routingKey : routingKeys) {
            String message = "RoutingSendDirect Send the message level:" + routingKey;
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            System.out.println("RoutingSendDirect Send" + routingKey + "':'" + message);
        }
        channel.close();
        connection.close();

    }
}

消费者1:

/**
 * 消费者1
 */
public class RoutingWorker1 {
    // 交换器名称
    private static final String EXCHANGE_NAME = "direct_logs";
    // 路由关键字
    private static final String[] routingKeys = new String[]{"info" ,"warning"};

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //获取匿名队列名称
        String queueName=channel.queueDeclare().getQueue();

        //根据路由关键字进行绑定
        for (String routingKey:routingKeys){
            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
            System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+"," +
                    " queue:"+queueName+", BindRoutingKey:" + routingKey);
        }
        System.out.println("ReceiveLogsDirect1  Waiting for messages");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("ReceiveLogsDirect1 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }

消费者2:

/**
 * 消费者2
 */
public class RoutingWorker2 {
    // 交换器名称
    private static final String EXCHANGE_NAME = "direct_logs";
    // 路由关键字
    private static final String[] routingKeys = new String[]{"error"};

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //获取匿名队列名称
        String queueName=channel.queueDeclare().getQueue();

        //根据路由关键字进行绑定
        for (String routingKey:routingKeys){
            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
            System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+"," +
                    " queue:"+queueName+", BindRoutingKey:" + routingKey);
        }
        System.out.println("ReceiveLogsDirect1  Waiting for messages");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("ReceiveLogsDirect1 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

结果:


生产者 消费者1 消费者2
  • 场景5:Topics (按topic发送接收)

使用场景:发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。(这种应该属于模糊匹配)

topic

生产者:

public class TopicProducer {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);

        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel();

        //声明一个匹配模式的交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        //待发送的消息
        String[] routingKeys=new String[]{
                "quick.orange.rabbit",
                "lazy.orange.elephant",
                "quick.orange.fox",
                "lazy.brown.fox",
                "quick.brown.fox",
                "quick.orange.male.rabbit",
                "lazy.orange.male.rabbit"
        };

        //发送消息
        for(String severity :routingKeys){
            String message = "From "+severity+" routingKey' s message!";
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
            System.out.println("TopicSend Sent '" + severity + "':'" + message + "'");
        }

        channel.close();
        connection.close();
        
    }
}

消费者1:


public class TopicConsumer1 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明一个匹配模式的交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        //路由关键字
        String[] routingKeys = new String[]{"*.orange.*"};

        //绑定路由
        for (String routingKey : routingKeys) {
            channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
            System.out.println("ReceiveLogsTopic1 exchange:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + routingKey);
        }
        System.out.println("ReceiveLogsTopic1 Waiting for messages");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("ReceiveLogsTopic1 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };

        channel.basicConsume(queueName, true, consumer);

    }
}

消费者2:

public class TopicConsumer2 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();


        //声明一个匹配模式的交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        // 路由关键字
        String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"};
//      绑定路由关键字
        for (String bindingKey : routingKeys) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
            System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey);
        }

        System.out.println("ReceiveLogsTopic2 Waiting for messages");


        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
                String message = new String(body, "UTF-8");
                System.out.println("ReceiveLogsTopic2 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);

    }

}

结果:


生产者 消费者1 消费者2

相关文章

  • 消息队列之 RabbitMQ

    消息队列之 RabbitMQ 转载自:消息队列值RabbitMQ 关于消息队列,从前年开始断断续续看了些资料,想写...

  • Redis作为消息队列与RabbitMQ的比较

    Redis作为消息队列与RabbitMQ的比较 RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议...

  • 消息队列——了解一下

    本文将从以下几点展开 为什么使用消息队列? 消息队列的缺点 消息队列如何选型 RabbitMQ RabbitMQ ...

  • 异步消息队列

    1. 消息队列 rabbitmq - 提供消息队列服务 rabbitmq 常用指令 docker run -d -...

  • SpringBoot使用RabbitMQ及RabbitMQ介绍

    SpringBoot使用RabbitMQ RabbitMQ 是 消息队列 实现AMQP(高级消息队列协议Advan...

  • 【rabbitMQ】消息队列之 rabbitMQ

    消息队列之 RabbitMQ(推荐) https://www.jianshu.com/p/79ca08116d57...

  • 消息队列之 RabbitMQ

    消息的介绍 关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技...

  • 消息队列之 RabbitMQ

    关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时...

  • 消息队列之 RabbitMQ

    关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时...

  • 消息队列之 RabbitMQ

    关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时...

网友评论

      本文标题:消息队列之rabbitMq

      本文链接:https://www.haomeiwen.com/subject/wkvooxtx.html