美文网首页IT小白
RabbitMQ基础概念和使用

RabbitMQ基础概念和使用

作者: 边学边记 | 来源:发表于2019-04-22 17:50 被阅读0次

    基本概念

    Amqp概念

    amqp,既Advanced Message Queuing Protocol ,一个提供统一消息服务的应用层标准高级消息队列协议

    包括的要素

    信道:多个线程间通过一个 tcp链接与服务端通讯,每个线程使用一个信道通讯,保证

    交换器-队列-绑定-路由键

    • 路由键是发送者指定由交换机和队列的绑定
    • 生产者发送消息到交换机
    • 消费者绑定队列
    • 到达了无人订阅的队列,消息会一直排队等待
    • 一个队列有多个订阅者---消息会循环方式 以此发个这几个消费者
    • 发送者发送一个不存在的路由键--消息会丢失
    clipboard.png

    消息的确认

    消费者收到的每一条消息都必须进行确认(手动或者自动)

    • 消费者迟迟不确认,rabbitMQ 会一直会保持这个消息,直到链接的断开,会将消息投递到另一个消费者(前提是有多个消费者)

    topic模式

    可以使通配符 通过交换机&路由键是消息到多个队列中去

    clipboard.png

    虚拟主机

    /service1

    /service2

    多个应用分区,类似oracle - 表空间

    每个用户名只能连自己的虚拟主机

    多个应用时可以很好地做服务隔离


    基础使用

    使用RabbitMQ原生Java客户端进行消息通信

    客户端需要amqp-client-5.0.0.jar和slf4j-api-1.6.1.jar
    建议使用Maven:
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.0.0</version>
    </dependency>
    注意:5系列的版本最好使用JDK8及以上, 低于JDK8可以使用4.x(具体的版本号到Maven的中央仓库查)的版本。
    

    使用RabbitMQ原生Java客户端进行消息通信

    Direct Exchange示例

    简单形式的 生产者-消费者

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     *direct类型交换器的生产者
     */
    public class DirectProducer {
    
        public final static String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args)
                throws IOException, TimeoutException {
            /* 创建连接,连接到RabbitMQ*/
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            Connection connection = connectionFactory.newConnection();
    
            /*创建信道*/
            Channel channel = connection.createChannel();
            /*创建交换器*/
            channel.exchangeDeclare(EXCHANGE_NAME,"direct");
            //channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
    
            /*日志消息级别,作为路由键使用*/
            String[] serverities = {"error","info","warning"};
            for(int i=0;i<3;i++){
                String severity = serverities[i%3];
                String msg = "Hellol,RabbitMq"+(i+1);
                /*发布消息,需要参数:交换器,路由键,其中以日志消息级别为路由键*/
                channel.basicPublish(EXCHANGE_NAME,severity,null,
                        msg.getBytes());
                System.out.println("Sent "+severity+":"+msg);
            }
            channel.close();
            connection.close();
    
        }
    
    }
    

    普通消费者

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     *普通的消费者
     */
    public class NormalConsumer {
    
        public static void main(String[] argv)
                throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
    
            // 打开连接和创建频道,与发送端一样
            Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
                    "direct");
    
            /*声明一个队列*/
            String queueName = "focuserror";
            channel.queueDeclare(queueName,false,false,
                    false,null);
    
            /*绑定,将队列和交换器通过路由键进行绑定*/
            String routekey = "info";/*表示只关注error级别的日志消息*/
            channel.queueBind(queueName,DirectProducer.EXCHANGE_NAME,routekey);
    
            System.out.println("waiting for message........");
    
            /*声明了一个消费者*/
            //envelope  信封 可以获取路由键,队列名 等信息
            final Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Received["+envelope.getRoutingKey()
                            +"]"+message);
                }
            };
            /*消费者正式开始在指定队列上消费消息*/
            channel.basicConsume(queueName,true,consumer);
        }
    
    }
    

    消费者绑定多个路由键

    结果:这个消费者 会收到多个队列的消息

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     *队列和交换器的多重绑定
     */
    public class MulitBindConsumer {
    
        public static void main(String[] argv) throws IOException,
                InterruptedException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
    
            // 打开连接和创建频道,与发送端一样
            Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
                    "direct");
    
            //声明一个随机队列
            String queueName = channel.queueDeclare().getQueue();
    
            /*队列绑定到交换器上时,是允许绑定多个路由键的,也就是多重绑定*/
            String[] severities={"error","info","warning"};
            for(String serverity:severities){
                channel.queueBind(queueName,DirectProducer.EXCHANGE_NAME,
                        serverity);
            }
            System.out.println(" [*] Waiting for messages:");
    
            // 创建队列消费者
            final Consumer consumerA = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties
                                                   properties,
                                           byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" Received "
                            + envelope.getRoutingKey() + ":'" + message
                            + "'");
                }
            };
            channel.basicConsume(queueName, true, consumerA);
        }
    }
    

    一个连接多个信道

    结果:每个消费者 都会收到所有的消息

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     *一个连接多个信道
     */
    public class MulitChannelConsumer {
    
        private static class ConsumerWorker implements Runnable{
            final Connection connection;
    
            public ConsumerWorker(Connection connection) {
                this.connection = connection;
            }
    
            public void run() {
                try {
                    /*创建一个信道,意味着每个线程单独一个信道*/
                    final Channel channel = connection.createChannel();
                    channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
                            "direct");
                    // 声明一个随机队列
                    String queueName = channel.queueDeclare().getQueue();
                    //消费者名字,打印输出用
                    final String consumerName
                            =  Thread.currentThread().getName()+"-all";
    
                    //所有日志严重性级别
                    String[] severities={"error","info","warning"};
                    for (String severity : severities) {
                        //关注所有级别的日志(多重绑定)
                        channel.queueBind(queueName,
                                DirectProducer.EXCHANGE_NAME, severity);
                    }
                    System.out.println("["+consumerName+"] Waiting for messages:");
    
                    // 创建队列消费者
                    final Consumer consumerA = new DefaultConsumer(channel) {
                        @Override
                        public void handleDelivery(String consumerTag,
                                                   Envelope envelope,
                                                   AMQP.BasicProperties
                                                           properties,
                                                   byte[] body)
                                throws IOException {
                            String message =
                                    new String(body, "UTF-8");
                            System.out.println(consumerName
                                    +" Received "  + envelope.getRoutingKey()
                                    + ":'" + message + "'");
                        }
                    };
                    channel.basicConsume(queueName, true, consumerA);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] argv) throws IOException,
                InterruptedException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
    
            // 打开连接和创建频道,与发送端一样
            Connection connection = factory.newConnection();
            //一个连接多个信道
            for(int i=0;i<2;i++){
                /*将连接作为参数,传递给每个线程*/
                Thread worker =new Thread(new ConsumerWorker(connection));
                worker.start();
            }
        }
    }
    

    一个队列多个消费者,则会表现出消息在消费者之间的轮询发送。

    消费者会轮询收到消息

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     *一个队列多个消费者,则会表现出消息在消费者之间的轮询发送。
     */
    public class MulitConsumerOneQueue {
    
        private static class ConsumerWorker implements Runnable{
            final Connection connection;
            final String queueName;
    
            public ConsumerWorker(Connection connection,String queueName) {
                this.connection = connection;
                this.queueName = queueName;
            }
    
            public void run() {
                try {
                    /*创建一个信道,意味着每个线程单独一个信道*/
                    final Channel channel = connection.createChannel();
                    channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
                            "direct");
                    /*声明一个队列,rabbitmq,如果队列已存在,不会重复创建*/
                    channel.queueDeclare(queueName,
                            false,false,
                            false,null);
                    //消费者名字,打印输出用
                    final String consumerName
                            =  Thread.currentThread().getName();
    
                    //所有日志严重性级别
                    String[] severities={"error","info","warning"};
                    for (String severity : severities) {
                        //关注所有级别的日志(多重绑定)
                        channel.queueBind(queueName,
                                DirectProducer.EXCHANGE_NAME, severity);
                    }
                    System.out.println(" ["+consumerName+"] Waiting for messages:");
    
                    // 创建队列消费者
                    final Consumer consumerA = new DefaultConsumer(channel) {
                        @Override
                        public void handleDelivery(String consumerTag,
                                                   Envelope envelope,
                                                   AMQP.BasicProperties
                                                           properties,
                                                   byte[] body)
                                throws IOException {
                            String message =
                                    new String(body, "UTF-8");
                            System.out.println(consumerName
                                    +" Received "  + envelope.getRoutingKey()
                                    + ":'" + message + "'");
                        }
                    };
                    channel.basicConsume(queueName, true, consumerA);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] argv) throws IOException,
                InterruptedException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
    
            // 打开连接和创建频道,与发送端一样
            Connection connection = factory.newConnection();
    
            //3个线程,线程之间共享队列,一个队列多个消费者
            String queueName = "focusAll";
            for(int i=0;i<3;i++){
                /*将队列名作为参数,传递给每个线程*/
                Thread worker =new Thread(new ConsumerWorker(connection,queueName));
                worker.start();
            }
    
        }
    }
    

    个人理解:信道只是客户端与服务端建立连接 ,消息消费时 是多个消费者轮询收到消息 还是 每个消费者收到全部消息 取决于这些消费者是否监听同一个队列
    模式:
    一个连接多个信道 -- 实际是 每个信道中有一个随机产生的队列名,此时是多个队列是不同的队列,每个队列 有一个消费者 那么这个消费者就会收到这个队列里的所有消息;在上一层 每个信道使用(邦定)的是同一个路由键(就是生产者发布的路由键),所以此时每个队列都会收到生产者发布的所有消息,进而每个消费者就可以收到所有消息
    一个队列多个消费者,则会表现出消息在消费者之间的轮询发送 -- 实际上是 多个信道使用(邦定)的同一个路由键,而这些信道使用的是同一个队列,消息会发布到这个队列中(此时只有这一个队列),所以多个消费者监听的是同一个队列,此时消息会被这些消费者(轮询,分发)收到

    相关文章

      网友评论

        本文标题:RabbitMQ基础概念和使用

        本文链接:https://www.haomeiwen.com/subject/hfeugqtx.html