美文网首页
RabbitMQ核心概念

RabbitMQ核心概念

作者: 青衣敖王侯 | 来源:发表于2019-07-28 19:53 被阅读0次

      RabbitMQ是一个开源的消息代理和队列服务器,通过普通协议在各应用之间共享数据。RabbitMq使用Erlang语言来编写的,并且在RabbitMq是基于AMQP协议的。

      哪些互联网大厂在使用RabbitMQ:滴滴、美团、头条、去哪儿都在使用RabbitMq,因为开源、性能优秀、稳定性保障,提供可靠性消息投递模式(confirm)、返回模式(return),与Spring AMQP完美的结合,API丰富,集群模式丰富,支持表达式配置,HA模式(高可用模式),镜像队列模型,保证数据不丢失的前提下做到高可靠性和可用性

      RabbitMQ高性能的原因:RabbitMQ使用Erlang语言开发,Erlang语言最初在于交换机领域的架构模式,使得RabbitMq在broker之间进行数据交互的性能是非常优秀的。Erlang有着和原生socket一样的延迟。

      AMQP:Advanced Message Queuing Protocol 高级消息队列协议。它是一个二进制协议。提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

    AMQP协议模型:


    关键点在于Exchange和MessageQueue是怎么交互的。

    AMQP核心概念

    Server:又称Broker接收客户端的连接,实现AMQP实体服务
    Connection:连接,应用程序与Broker的网络连接
    Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道(读写和删除都通过它)。客户端可建立多个Channel,每个Channel代表一个会话任务(类似于数据库中的Session,数据库建立连接后就会创建一个Session)
    Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则是消息体内容。
    Virtual host:虚拟主机,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange和Queue。就像redis是一个逻辑概念,redis中默认有16个db,假设我们给redis16个GB的内存,我们不可能说db0只能1个gb,db1只能1个GB,而是将这16个GB统一的给到了整个redis,然后每个db按照业务逻辑存储不同的数据量大小。对于RabbitMQ,Virtual Host类似于对消息做路由。
    Exchange:交换机,接收消息,根据路由键转发转发消息到绑定的队列。
    Binding:Excahnge和Queue之间的虚拟连接,binding中可以包含routing key
    Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息。
    Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者。


    RabbitMQ安装

    建议参考这篇文章https://www.cnblogs.com/liaojie970/p/6138278.html
    cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.6/ebin
    vi rabbit.app

    服务的启动

    rabbitmq-server start &


    服务的停止

    rabbitmqctl stop_app或者rabbitmq-server stop

    管理插件


    启动好后看到以下界面:


    命令行与管控台基础操作






    ram表示内存加入,disc表示磁盘加入

    生产者消费者模型构建

      目前我们暂时不用springboot构建,先从原生的API了解一下rabbitmq。

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.bfxy</groupId>
        <artifactId>rabbitmq-api</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>rabbitmq-api</name>
        <description>rabbitmq-api</description>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.14.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.6.5</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>
    

    生产者

    public class Procuder {
    
        
        public static void main(String[] args) throws Exception {
            //1 创建一个ConnectionFactory, 并进行配置
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("xx.xxx.xxx.xx");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
            
            //3 通过connection创建一个Channel
            Channel channel = connection.createChannel();
            
            //4 通过Channel发送数据
            for(int i=0; i < 5; i++){
                String msg = "Hello RabbitMQ!";
                //1 exchange   2 routingKey 3.properties 4body 如果exchange不指定,默认会使用AMQP default这个exchange,它绑定了所有的队列,而且根据routing key的名字找对应的队列
                channel.basicPublish("", "test001", null, msg.getBytes());
            }
    
            //5 记得要关闭相关的连接
            channel.close();
            connection.close();
        }
    }
    

    消费者

    public class Consumer {
    
        public static void main(String[] args) throws Exception {
            
            //1 创建一个ConnectionFactory, 并进行配置
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("xx.xxx.xxx.xx");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
            
            //3 通过connection创建一个Channel
            Channel channel = connection.createChannel();
            
            //4 声明(创建)一个队列
            String queueName = "test001";
            channel.queueDeclare(queueName, true, false, false, null);
            
            //5 创建消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            
            //6 设置Channel
            channel.basicConsume(queueName, true, queueingConsumer);
            
            while(true){
                //7 获取消息
                Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.err.println("消费端: " + msg);
                //Envelope envelope = delivery.getEnvelope();
            }
            
        }
    }
    

    Exchange交换机


    交换机属性
    交换机属性

    Direct Exchange

    所有发送到Direct Exchange的消息被转发到RouteKey中指定的queue
    Direct模式可以使用RabbitMq自带的exchange:default exchange,所以不需要将exchange进行任何绑定操作,消息传递时,routekey必须完全匹配才会被队列接收,否则该消息会被丢弃。我们看一下直连的代码示例:
    生产者:

    public class Producer4DirectExchange {
    
        
        public static void main(String[] args) throws Exception {
            
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("49.234.231.49");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_direct_exchange";
            String routingKey = "test.direct111";
            //5 发送
            
            String msg = "Hello World RabbitMQ 4  Direct Exchange Message 111 ... ";
            channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());         
            
        }
        
    }
    

    消费者:

    public class Consumer4DirectExchange {
    
        public static void main(String[] args) throws Exception {
            
            
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            
            connectionFactory.setHost("49.234.231.49");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
            
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_direct_exchange";
            String exchangeType = "direct";
            String queueName = "test_direct_queue";
            String routingKey = "test.direct";
            
            //表示声明了一个交换机
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            //表示声明了一个队列
            channel.queueDeclare(queueName, false, false, false, null);
            //建立一个绑定关系:
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);  
            //循环获取消息  
            while(true){  
                //获取消息,如果没有消息,这一步将会一直阻塞  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg);  
            } 
        }
    }
    

    Topic Exchange
      所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上。Exchange将routeKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic



    Topic Exchange示例

    代码示例:
    消费者

    public class Consumer4TopicExchange {
    
        public static void main(String[] args) throws Exception {
            
            
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
            
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_topic_exchange";
            String exchangeType = "topic";
            String queueName = "test_topic_queue";
            //String routingKey = "user.*";
            String routingKey = "user.*";
            // 1 声明交换机 
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            // 2 声明队列
            channel.queueDeclare(queueName, false, false, false, null);
            // 3 建立交换机和队列的绑定关系:
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);  
            //循环获取消息  
            while(true){  
                //获取消息,如果没有消息,这一步将会一直阻塞  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg);  
            } 
        }
    }
    

    生产者

    public class Producer4TopicExchange {
    
        
        public static void main(String[] args) throws Exception {
            
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_topic_exchange";
            String routingKey1 = "user.save";
            String routingKey2 = "user.update";
            String routingKey3 = "user.delete.abc";
            //5 发送
            
            String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
            channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
            channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());    
            channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
            channel.close();  
            connection.close();  
        }
        
    }
    

    Fanout Exchange

    这个Exchange用的是最多的,而且转发消息是最快的,因为它不处理路由键,只需要简单的将队列绑定到交换机上。发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。



    代码示例:
    消费者:

    public class Consumer4FanoutExchange {
    
        public static void main(String[] args) throws Exception {
            
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
            
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_fanout_exchange";
            String exchangeType = "fanout";
            String queueName = "test_fanout_queue";
            String routingKey = ""; //不设置路由键
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer); 
            //循环获取消息  
            while(true){  
                //获取消息,如果没有消息,这一步将会一直阻塞  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg);  
            } 
        }
    }
    

    生产者

    public class Producer4FanoutExchange {
    
        
        public static void main(String[] args) throws Exception {
            
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_fanout_exchange";
            //5 发送
            for(int i = 0; i < 10; i ++) {
                String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
                channel.basicPublish(exchangeName, "", null , msg.getBytes());          
            }
            channel.close();  
            connection.close();  
        }
        
    }
    

    绑定

    刚才的演示中我们看到了很多绑定,但是只是看到了Exchange和Queue之间的连接关系。其实Exchange和Exchange之间也有绑定关系。Bingding中可以包含RoutingKey或者参数。

    队列

    实际存储数据的地方。它可以选择持久化或者不持久化,如果AutoDelete选择了yes,代表最后一个监听被一处之后,该Queue会自动被删除。

    Message

    服务器和应用程序之间传递的数据,本质上就是一段数据,由Properties和Payload(Body)组成。常用的属性有delivery mode、headers(自定义属性)
    代码示例:
    消费端:

    public class Consumer {
    
        public static void main(String[] args) throws Exception {
            
            //1 创建一个ConnectionFactory, 并进行配置
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
            
            //3 通过connection创建一个Channel
            Channel channel = connection.createChannel();
            
            //4 声明(创建)一个队列
            String queueName = "test001";
            channel.queueDeclare(queueName, true, false, false, null);
            
            //5 创建消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            
            //6 设置Channel
            channel.basicConsume(queueName, true, queueingConsumer);
            
            while(true){
                //7 获取消息
                Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.err.println("消费端: " + msg);
                Map<String, Object> headers = delivery.getProperties().getHeaders();
                System.err.println("headers get my1 value: " + headers.get("my1"));
                
                //Envelope envelope = delivery.getEnvelope();
            }
            
        }
    }
    

    生产者:

    public class Procuder {
    
        
        public static void main(String[] args) throws Exception {
            //1 创建一个ConnectionFactory, 并进行配置
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
            
            //3 通过connection创建一个Channel
            Channel channel = connection.createChannel();
            
            Map<String, Object> headers = new HashMap<>();
            headers.put("my1", "111");
            headers.put("my2", "222");
            
            
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .expiration("10000")
                    .headers(headers)
                    .build();
            
            //4 通过Channel发送数据
            for(int i=0; i < 5; i++){
                String msg = "Hello RabbitMQ!";
                //1 exchange   2 routingKey
                channel.basicPublish("", "test001", properties, msg.getBytes());
            }
    
            //5 记得要关闭相关的连接
            channel.close();
            connection.close();
        }
    }
    

    虚拟主机

    用于进行逻辑隔离,是最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue

    相关文章

      网友评论

          本文标题:RabbitMQ核心概念

          本文链接:https://www.haomeiwen.com/subject/fvpurctx.html