美文网首页
rabbitmq入门

rabbitmq入门

作者: 小鱼_a563 | 来源:发表于2022-12-01 21:59 被阅读0次

    什么是rabbitmq?

    RabbitMQ.png

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端

    rabbitmq.png

    第一种方式:点对点

    点对点.png
    • P:生成者
    • C:消费者
    • 红色方块代表信道

    生成者

    void producerSendMessing() {
            //生产者代码
            //创建一个rabbitmq的连接工厂
            ConnectionFactory connectionFactory =new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setConnectionTimeout(1000);
            connectionFactory.setPort(5672);
            //设置虚拟主机
            connectionFactory.setVirtualHost("/ems");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            try {
                //开启一个server连接
                Connection connection = connectionFactory.newConnection();
                //server连接创建一个信道
                Channel channel = connection.createChannel();
                //通过信道绑定一个队列,两个重载方法
                /*
                *  @param queue :队列的名称
                 * @param durable 是否持久化,
                 * @param exclusive 是否独占这个队列
                 * @param autoDelete 是否消费之后,删除队列
                 * @param arguments 其他参数设置,是一个map
                * */
                channel.queueDeclare("hello",true,false,false,null);
                /**
                 * 三个重载方法,
                 *
                 * @param exchange 交换器
                 * @param routingKey 路由密钥
                 * @param支持消息的其他属性-路由标头等
                 * @param正文消息正文
                 */
                channel.basicPublish("","hello",null,"hello world!".getBytes());
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    

    消费者

    void consumerReceivingMessing() {
            //生产者代码
            //创建一个rabbitmq的连接工厂
            ConnectionFactory connectionFactory =new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setConnectionTimeout(1000);
            connectionFactory.setPort(5672);
            //设置虚拟主机
            connectionFactory.setVirtualHost("/ems");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            try {
                //开启一个server连接
                Connection connection = connectionFactory.newConnection();
                //server连接创建一个信道
                Channel channel = connection.createChannel();
                //通过信道绑定一个队列,两个重载方法
                /*
                 *  @param queue :队列的名称
                 * @param durable 是否持久化,
                 * @param exclusive 是否独占这个队列
                 * @param autoDelete 是否消费之后,删除队列
                 * @param arguments 其他参数设置,是一个map
                 * */
                channel.queueDeclare("hello",true,false,false,null);
                /**
                 * @param queue队列名称
                 * @param autoAck如果服务器应考虑消息,则为true。自动确认机制
                 *交付后确认; 如果服务器应该期望,则返回false
                 *明确的确认
                 * @param回调用户对象的接口Consumer,默认实现类DefaultConsumer,需要传入信道
                 */
                String hello = channel.basicConsume("hello", true,new DefaultConsumer(channel){
                        //最后一个参数是消息体
                        @Override
                        public void handleDelivery(String consumerTag,
                                                   Envelope envelope,
                                                   AMQP.BasicProperties properties,
                                                   byte[] body)
                                throws IOException
                        {
                            System.out.println(new String(body));
                        }
    
                });
                System.out.println(hello);
                //连接关闭
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    

    总结

    • 无论是生成者还是消费者都需要连接到rabbitmq的server。通过信道操作消息

    work模型

    Work queues, 也被称为(Task queues), 任务模型。 当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费, 就会消失,因此任务是不会被重复执行的。


    image.png

    角色:
    ●P:生产者:任务的发布者
    ●C1:消费者,领取任务并且完成任务,假设完成速度较慢
    ●C2:消费者2:领取任务并完成任务,假设完成速度快

    总结

    • 默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。

    为了避免消费者1处理消息的业务慢,消费者2处理消息的业务快。但是因为轮询机制,导致消息被消费者1拿了过去进行堵塞,从而导致系统宕机。消息确认机制相应出现。

    消费者需要拿到消息之后回复队列已处理才会拿到下一条消息,从而实现能者多劳

    • 消费者1:假设完成速度较慢
    public static void main(String[] args) throws IOException {
            Connection mqConnection = RabbitMqUtils.getMqConnection();
            //创建一个信道
            Channel channel = mqConnection.createChannel();
            //信道每次只传递1个消息
            channel.basicQos(1);
            //绑定到一个work队列
            channel.queueDeclare("work",true,false,false,null);
            //获取消息
            channel.basicConsume("work",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                        //模拟消费者处理慢的场景
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("消费者2:--->"+new String(body));
                    //参数1:根据标签回复那条消息,参数2:是否回复多条消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            });
        }
    
    • 消费者2:领取任务并完成任务,假设完成速度快
    public static void main(String[] args) throws IOException {
            Connection mqConnection = RabbitMqUtils.getMqConnection();
            //创建一个信道
            Channel channel = mqConnection.createChannel();
            //信道每次只传递1个消息
            channel.basicQos(1);
            //绑定到一个work队列
            channel.queueDeclare("work",true,false,false,null);
            channel.basicConsume("work",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1:--->"+new String(body));
                    //参数1:根据标签回复那条消息,参数2:是否回复多条消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
    
                }
            });
        }
    

    注意

    代码中在消费者里面设置了信道里面只消费1条消息,并且处理业务之后会回复消息已被消费

    fanout广播模型

    fanout.png
    在广播模式下,消息发送流程是这样的:
    ●可以有多个消费者
    ●每个消费者有自己的queue (队列)
    ●每个队列都要绑定到Exchange (交换机)
    ●生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
    ●交换机把消息发送给绑定过的所有队列
    ●队列的消费者都能拿到消息。实现一条消息被多个消费者消费
    生产者代码
    public static void main(String[] args) throws IOException {
            Connection mqConnection = RabbitMqUtils.getMqConnection();
            //创建一个信道
            Channel channel = mqConnection.createChannel();
            //声明一个交换机。参数1:交换机名称,参数2:选择交换机模式。fanout:广播模式
            channel.exchangeDeclare("logs","fanout");
            channel.basicPublish("logs","",null,("fanout条消息").getBytes());
            //关闭连接
            RabbitMqUtils.closeConnectionAndChannel(channel,mqConnection);
    
        }
    

    消费者代码

    public static void main(String[] args) throws IOException {
            Connection mqConnection = RabbitMqUtils.getMqConnection();
            //创建一个信道
            Channel channel = mqConnection.createChannel();
            //绑定一个交换机
            channel.exchangeDeclare("logs","fanout");
            //获取临时队列
            String queue = channel.queueDeclare().getQueue();
            //绑定交换机和队列
            channel.queueBind(queue,"logs","");
            channel.basicConsume(queue,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1:--->"+new String(body));
                }
            });
        }
    

    Routing模式:1direct(直连模型)

    在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
    在Direct模型下:
    ●队列与交换机的绑定,不能是任意绑定了,而是要指定-个RoutingKey (路由key)
    ●消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey。
    ●Exchange不再把消息交给每- 个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息


    image.png

    生产者代码

    public static void main(String[] args) throws IOException {
            Connection mqConnection = RabbitMqUtils.getMqConnection();
            //创建一个信道
            Channel channel = mqConnection.createChannel();
            //创建一个交换器,参数1:交换器名称,参数2:交换器模式:路由
            channel.exchangeDeclare("log_direct","direct");
            //声明一个routingKey
            String routingKey="error";
            //发送消息
            channel.basicPublish("log_direct",routingKey,null,("这是direc模型发送的消息:"+routingKey).getBytes());
            //关闭链接
            RabbitMqUtils.closeConnectionAndChannel(channel,mqConnection);
        }
    

    消费者1代码

    public static void main(String[] args) throws IOException {
            //获取链接
            Connection mqConnection = RabbitMqUtils.getMqConnection();
            //创建信道
            Channel channel = mqConnection.createChannel();
            //信道绑定交换器
            channel.exchangeDeclare("log_direct","direct");
            //获取临时队列名称
            String queue = channel.queueDeclare().getQueue();
            String routingKey="info";
            //信道绑定交换器,路由键,队列
            channel.queueBind(queue,"log_direct",routingKey);
            //获取消息
            channel.basicConsume(queue,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1:"+new String(body));
                }
            });
        }
    

    消费者2代码

    public static void main(String[] args) throws IOException {
            //创建一个连接
            Connection mqConnection = RabbitMqUtils.getMqConnection();
            //创建一个信道
            Channel channel = mqConnection.createChannel();
            //绑定一个交换机
            channel.exchangeDeclare("log_direct","direct");
            //获取信道的一个临时队列
            String queue = channel.queueDeclare().getQueue();
            //定义三个路由键
            String routingKey="error";
            String routingKeyWarring="warring";
            String routingKeyInfo="info";
            //信道绑定队列,交换机和路由键
            channel.queueBind(queue,"log_direct",routingKey);
            channel.queueBind(queue,"log_direct",routingKeyWarring);
            channel.queueBind(queue,"log_direct",routingKeyInfo);
            //获取消息
            channel.basicConsume(queue,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2:消费了"+new String(body));
                }
            });
        }
    

    Routing模式:topic(订阅模型)

    Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配
    符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以"分割,例如: item. insert


    topic.png
    • 通配符:
      * 代表只匹配一个单词,比如user.*的路由键可以接受user.add,user.delete,user.update
      #代表只匹配多个单词,比如user.#的路由键可以接受user.add.all,user.delete.all

    生产者代码

    public static void main(String[] args) throws IOException {
            //创建连接
            Connection mqConnection = RabbitMqUtils.getMqConnection();
            //创建信道
            Channel channel = mqConnection.createChannel();
            //绑定交换机
            channel.exchangeDeclare("topic","topic");
            //路由
            String routingKey="user.save";
            //发送消息
            channel.basicPublish("topic",routingKey,null,("生产了"+routingKey+"消息").getBytes());
            //关闭连接
            RabbitMqUtils.closeConnectionAndChannel(channel,mqConnection);
        }
    

    消费者1代码是 * 的通配符

    public static void main(String[] args) throws IOException {
            Connection mqConnection = RabbitMqUtils.getMqConnection();
            Channel channel = mqConnection.createChannel();
            channel.exchangeDeclare("topic","topic");
            String queue = channel.queueDeclare().getQueue();
            //路由键匹配一个单词比如user.save
            channel.queueBind(queue,"topic","user.*");
            channel.basicConsume(queue,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1:"+new String(body));
                }
            });
        }
    

    消费者2代码是#号的通配符

    public static void main(String[] args) throws IOException {
            Connection mqConnection = RabbitMqUtils.getMqConnection();
            Channel channel = mqConnection.createChannel();
            channel.exchangeDeclare("topic","topic");
            String queue = channel.queueDeclare().getQueue();
            //路由键匹配多个单词比如user.id.delete
            channel.queueBind(queue,"topic","user.#");
            //消费消息
            channel.basicConsume(queue,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1:"+new String(body));
                }
            });
        }
    

    相关文章

      网友评论

          本文标题:rabbitmq入门

          本文链接:https://www.haomeiwen.com/subject/koshlltx.html