美文网首页
rabbitmq介绍

rabbitmq介绍

作者: 源来是你啊 | 来源:发表于2019-09-28 15:29 被阅读0次

    RabbitMq

    amqp协议

    AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。

    1.简介

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

    2.使用

    2.1 核心概念

    Message :消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、 priority(相对于其他消息的优先权)、 delivery-mode(指出 该消息可能需要持久性存储)等。

    Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序

    Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有4种类型: direct(默认), fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别

    Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

    Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连 接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和Queue的绑定可以是多对多的关系。

    Connection:网络连接,比如一个TCP连接。

    Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接, AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

    Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

    Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。 vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

    2019-06-14_145924.png

    2.2 简单模式

    dubbo-service-governance.jpg

    一个生产者,一个消费者

     * 获取连接
     * @return Connection
     * @throws Exception
     */
     public static Connection getConnection() throws Exception {
     //定义连接工厂
     ConnectionFactory factory = new ConnectionFactory();
     factory.setHost("192.168.1.235");
     factory.setPort(5672);
     //设置vhost
     factory.setVirtualHost("/tzb");
     factory.setUsername("test");
     factory.setPassword("123456");
     //通过工厂获取连接
     Connection connection = factory.newConnection();
     return connection;
     }
    ​
     //创建队列,发送消息
     public static void main(String[] args) throws Exception {
     //获取连接
     Connection connection = ConnectionUtil.getConnection();
     //创建通道
     Channel channel = connection.createChannel();
     //声明创建队列
     channel.queueDeclare(QUEUE_NAME,false,false,false,null);
     //消息内容
     String message = "Hello World!";
     channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
     System.out.println("发送消息:"+message);
     //关闭连接和通道
     channel.close();
     connection.close();
     }
    ​ //消费者消费消息
     public static void main(String[] args) throws Exception {
     //获取连接和通道
     Connection connection = ConnectionUtil.getConnection();
     Channel channel = connection.createChannel();
     //声明通道
     channel.queueDeclare(QUEUE_NAME,false,false,false,null);
     //定义消费者
     QueueingConsumer consumer = new QueueingConsumer(channel);
     //监听队列
     channel.basicConsume(QUEUE_NAME,true,consumer);
    ​
     while(true){
     //这个方法会阻塞住,直到获取到消息
     QueueingConsumer.Delivery delivery = consumer.nextDelivery();
     String message = new String(delivery.getBody());
     System.out.println("接收到消息:"+message);
     }
     }
    

    2.3 work模式

    2019-06-14_175154.png

    一个生产者,多个消费者,每个消费者获取到的消息唯一

    public static void main(String[] args) throws Exception {
     //获取连接和通道
     Connection connection = ConnectionUtil.getConnection();
     Channel channel = connection.createChannel();
     //声明队列
     channel.queueDeclare(QUEUE_NAME,false,false,false,null);
     String message = "";
     for(int i = 0; i<100; i++){
     message = "" + i;
     channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
     System.out.println("发送消息:"+message);
     Thread.sleep(i);
     }
    ​
     channel.close();
     connection.close();
     }
    ​
     //消费者1
     public static void main(String[] args) throws Exception {
     Connection connection = ConnectionUtil.getConnection();
     Channel channel = connection.createChannel();
     channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    ​
     //同一时刻服务器只发送一条消息给消费端
     channel.basicQos(1);
    ​
     QueueingConsumer consumer = new QueueingConsumer(channel);
    ​
     channel.basicConsume(QUEUE_NAME,false,consumer);
    ​
     while(true){
     QueueingConsumer.Delivery delivery = consumer.nextDelivery();
     String message = new String(delivery.getBody());
     System.out.println("recive1:"+message);
     Thread.sleep(100);
     //消息消费完给服务器返回确认状态,表示该消息已被消费
     channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
     }
     }
    ​
     //生产者2
     public static void main(String[] args) throws Exception {
     Connection connection = ConnectionUtil.getConnection();
     Channel channel = connection.createChannel();
     channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    ​
     channel.basicQos(1);
    ​
     QueueingConsumer consumer = new QueueingConsumer(channel);
    ​
     channel.basicConsume(QUEUE_NAME,true,consumer);
    ​
     while(true){
     QueueingConsumer.Delivery delivery = consumer.nextDelivery();
     String message = new String(delivery.getBody());
     System.out.println("recive1:"+message);
     Thread.sleep(10);
     //channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
     }
     }
    

    消息消费的两种模式

    1、 自动模式

    消费者从消息队列获取消息后,服务端就认为该消息已经成功消费。

    2、 手动模式

    消费者从消息队列获取消息后,服务端并没有标记为成功消费 ​ 消费者成功消费后需要将状态返回到服务端

    2.4 订阅模式

    一个生产者发送的消息会被多个消费者获取。

    生产者:可以将消息发送到队列或者是交换机。

    消费者:只能从队列中获取消息。

    如果消息发送到没有队列绑定的交换机上,那么消息将丢失。

    2019-06-14_175701.png

    2.5 路由模式

    1、 发送消息到交换机并且要指定路由key

    2、 消费者将队列绑定到交换机时需要指定路由key

    是一种完全匹配,只有匹配到的消费者才能消费消息

    2019-06-14_175852.png

    相关文章

      网友评论

          本文标题:rabbitmq介绍

          本文链接:https://www.haomeiwen.com/subject/fqgquctx.html