美文网首页
RabbitMQ的动态创建交换机、队列、绑定、死信队列,延迟队列

RabbitMQ的动态创建交换机、队列、绑定、死信队列,延迟队列

作者: thinking2019 | 来源:发表于2020-08-20 21:53 被阅读0次

    ​---实践是检验真理的唯一标准---

    yml参数配置

    这次我使用的是RabbitTemplate

    rabbitmq:
        host: 192.168.225.136
        port: 5672
        username: thinking
        password: 123
        virtual-host: host1
        publisher-returns: true
        # 事务模式下这行需要删除
        publisher-confirm-type: correlated
        template:
          # 找不到路由规则的消息 是否保留
          mandatory: true
    

    为什么Template不需要定义configuration文件来接收yml文件的参数?

    这是个常识问题,我这里做个记录。。。

    我都能忘记昨天吃了东西的,好在我喜欢做笔记。。。

    springboot中何时加载Template,可以仔细看看自动装配注解:EnableAutoConfiguration
    这类Template模板的初始化有个Properties文件,不如:
    RabbitProperties
    RedisProperties
    方法中注解:ConfigurationProperties  指定了默认取得yml格式内容
        至于具体的属性可以找set方法
    

    我们的demo都是基于RabbitTemplate来写。。。

    初始化数据

    通过枚举ExchangeEnum、QueueEnum、BindingEnum动态维护和创建
    1.初始化交换机

    @Bean("createExchange")
    public Object createExchange(RabbitAdmin rabbitAdmin) {
        // 遍历交换机枚举
        ExchangeEnum.toList().forEach(exchangeEnum -> {
            // 根据交换机模式 生成不同的交换机
            switch (exchangeEnum.getType()) {
                case fanout:
                    rabbitAdmin.declareExchange(new FanoutExchange(exchangeEnum.getExchangeName(),
                            exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
                    break;
                case topic:
                    rabbitAdmin.declareExchange(new TopicExchange(exchangeEnum.getExchangeName(),
                            exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
                    break;
                case direct:
                    rabbitAdmin.declareExchange(new DirectExchange(exchangeEnum.getExchangeName(),
                            exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
                    break;
            }
        });
        return null;
    }
    

    2.初始化队列

    @Bean("createQueue")
    public Object createQueue(RabbitAdmin rabbitAdmin) {
        // 遍历队列枚举 将队列注册到spring bean工厂 让spring实现队列的管理
        QueueEnum.toList().forEach(queueEnum -> {
            rabbitAdmin.declareQueue(new Queue(queueEnum.getName(),
                    queueEnum.isDurable(), queueEnum.isExclusive(), queueEnum.isAutoDelete(), queueEnum.getArguments()));
        });
        return null;
    }
    

    3.交换机和队列绑定

    @Bean("createBinding")
    public Object createBinding(RabbitAdmin rabbitAdmin) {
        // 遍历队列枚举 将队列绑定到指定交换机
        BindingEnum.toList().forEach(bindingEnum -> {
            // 交换机
            ExchangeEnum exchangeEnum = bindingEnum.getExchangeEnum();
            // queue
            QueueEnum queueEnum = bindingEnum.getQueueEnum();
            // 绑定
            rabbitAdmin.declareBinding(new Binding(
                    // queue名称
                    queueEnum.getName(),
                    Binding.DestinationType.QUEUE,
                    // exchange名称
                    exchangeEnum.getExchangeName(),
                    // queue的routingKey
                    queueEnum.getRoutingKey(),
                    // 绑定的参数
                    bindingEnum.getArguments()));
        });
        return null;
    }
    

    延迟队列

    1.定义队列

    /**
     * 超时队列---不需要定义RabbitListener方法
     */
    deal_queue("deal_queue", "deal.queue", true, false, false, dealParams()),
    /**
     * 超时接收队列
     */
    reply_queue("reply_queue", "reply.queue", true, false, false, null),
    
    public static Map<String, Object> dealParams(){
          // reply_to 队列
          Map<String,Object> map = new HashMap<>();
          //设置消息的过期时间 单位毫秒
          map.put("x-message-ttl",10000);
          //设置附带的死信交换机
          map.put("x-dead-letter-exchange","reply_exchange");
          //指定重定向的路由建 消息作废之后可以决定需不需要更改他的路由建 如果需要 就在这里指定
          map.put("x-dead-letter-routing-key","reply.queue");
          return map;
      }
    

    2.定义交换机

    /**
     * 超时交换机
     */
    deal_exchange("deal_exchange", ExchangeTypeEnum.topic, true, false),
    /**
     * 超时接收交换机
     */
    reply_exchange("reply_exchange", ExchangeTypeEnum.topic, true, false),
    

    3.交换机和队列绑定

    deal_binding(ExchangeEnum.deal_exchange, QueueEnum.deal_queue, null),
    reply_binding(ExchangeEnum.reply_exchange, QueueEnum.reply_queue, null)
    

    4.不定义超时队列的@RabbitListener,只定义超时接收队列的@RabbitListener

    @RabbitListener(queues = {"reply_queue"})
    @RabbitHandler
    public void reply_queue(Message message, Channel channel) throws Exception {
        System.err.println("消费端-reply: " + new String(message.getBody(), "UTF-8"));
        Long deliveryTag = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag, false);
    }
    

    测试:

    /**
     * 延迟队列测试
     */
    public void deal_queue_test() {
        ExchangeEnum exchangeEnum = BindingEnum.deal_binding.getExchangeEnum();
        QueueEnum queueEnum = BindingEnum.deal_binding.getQueueEnum();
        // 消息
        String message = "11111111111111111111111111111111111111";
        MessageProperties messageProperties = getMessageProperties();
        // 发送
        rabbitTemplate.convertSendAndReceive(
                exchangeEnum.getExchangeName(),
                queueEnum.getRoutingKey(),
                new Message(message.getBytes(), messageProperties));
    }
    

    异步队列

    1.AsyncRabbitTemplate定义

    /**
     * 异步队列
     * @param rabbitTemplate
     * @return
     */
    @Bean
    public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate){
        AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
        asyncRabbitTemplate.setReceiveTimeout(50000);
        return asyncRabbitTemplate;
    }
    

    2.测试

    public void async() {
        System.err.println("---------------async--------------start---------");
        AsyncRabbitTemplate.RabbitConverterFuture<Object> future = asyncRabbitTemplate.convertSendAndReceive("reply_exchange", "reply.queue", "123123123");
        // 配置下面代码时 如果 队列监听中没有返回值时会报错
        future.addCallback(new ListenableFutureCallback<Object>() {
            @Override
            public void onFailure(Throwable ex) {
                ex.printStackTrace();
            }
            @Override
            public void onSuccess(Object result) {
                System.out.println("回调收到结果=> " + result);
            }
        });
        System.err.println("---------------async--------------end---------");
    }
    

    3.监听方法

    @RabbitListener(queues = {"async_queue"})
    @RabbitHandler
    public Object async_queue(Message message, Channel channel) throws Exception {
        System.err.println("消费端-async: " + new String(message.getBody(), "UTF-8"));
        return "ok";
    }
    

    Java api

    1.消息回退:

    void basicNack(long deliveryTag, boolean multiple, boolean requeue)

    long deliveryTag:消息唯一标识,这是RabbitMQ自动生成的,不需要人为管理,只需要从message.getMessageProperties().getDeliveryTag() 就可以获得。
    boolean multiple:是否批量退回,不开启就使用false,开启批量退回需要增加自己的业务判断逻辑(比如:攒够几条再批量回退,或者设置等待间隔等等)
    boolean requeue:是否退回到消息队列,退回就使用true,就是交给其他消费者处理。
    

    2.拒绝消息

    void basicReject(long deliveryTag, boolean requeue) throws IOException;

    deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,它在channel范围内是唯一的
    requeue:表示如何处理这条消息,为true表示重新放入RabbitMQ的发送队列中,为false表示通知RabbitMQ销毁该消息
    

    3.确认ack

    void basicAck(long deliveryTag, boolean multiple) throws IOException;

    deliveryTag:该消息的index
    multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
    

    4.创建一个队列

    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
    ​
    durable:true、false true:在服务器重启时,能够存活
    exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。
    autodelete:当没有任何消费者使用时,自动删除该队列
    

    5.启动一个消费者,并返回服务端生成的消费者标识

    /**
     * queue:队列名
     * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
     * consumerTag:客户端生成的一个消费者标识
     * nolocal:如果服务器不应将在此通道连接上发布的消息传递给此使用者,则为true;请注意RabbitMQ服务器上不支持此标记
     * exclusive: 如果是单个消费者,则为true
     * arguments:消费的一组参数
     * deliverCallback: 当一个消息发送过来后的回调接口
     * cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
     * shutdownSignalCallback: 当channel/connection 关闭后回调
     */
    channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {});
    

    6.取消消费者订阅

    /**
    * 取消消费者对队列的订阅关系
    * consumerTag:服务器端生成的消费者标识
    **/
    void basicCancel(String consumerTag)
    

    7.主动拉取队列中的一条消息

    /**
     * 从消息队列中取出第一条消息;整个方法的执行过程是首先消费队列,然后检索第一条消息,然后再取消订阅
     */
    GetResponse response = channel.basicGet(QUEUE_NAME, true);
    System.out.println("消费者接收到的消息是:"+new String(response.getBody(), "UTF-8"));
    

    参数介绍

    1.队列参数

    x-dead-letter-exchange 死信交换机
    x-dead-letter-routing-key 死信消息重定向路由键
    x-expires 队列在指定毫秒数后被删除
    x-ha-policy 创建HA队列
    x-ha-nodes HA队列的分布节点
    x-max-length 队列的最大消息数
    x-message-ttl 毫秒为单位的消息过期时间,队列级别
    x-max-priority 最大优先值为255的队列优先排序功能
    

    2.消息参数

    content-type 消息体的MIME类型,如application/json
    content-encoding 消息的编码类型
    message-id 消息的唯一性标识,由应用进行设置
    correlation-id 一般用做关联消息的message-id,常用于消息的响应
    timestamp 消息的创建时刻,整形,精确到秒
    

    完整项目地址在微信公众中,谢谢大家支持

    Java技术学习笔记

    相关文章

      网友评论

          本文标题:RabbitMQ的动态创建交换机、队列、绑定、死信队列,延迟队列

          本文链接:https://www.haomeiwen.com/subject/suaejktx.html