美文网首页RocketMQ
四、RocketMQ-Producer的Send方法

四、RocketMQ-Producer的Send方法

作者: ASD_92f7 | 来源:发表于2019-04-23 16:23 被阅读57次

    一、概述

    RocketMQ的producer默认有两个,一个是DefaultMQProducer,另一个是TransactionMQProducer,本文只对send方法做一个总结,其他的细节在其他章节介绍

    二、DefaultMQProducer

    一共定义了17种send方法,从4.x版本,事务消息被放到了TransactionMQProducer中,所以有15个send方法,这15个方法中,又有两个异步带超时时间的send方法被废弃了,所以有效的send方法有13个:

    1、同步发送

    /**
         * 同步发送模式. 只有消息被成功接收并且被固化完成后才会收到反馈。
         * 内置有重发机制, producer将会重试
         * {@link #retryTimesWhenSendFailed,default=2} 次 ,然后才会报错. 
         * 因此,有一定的概率向broker发送重复的消息
         * 使用者有责任去解决潜在的重复数据造成的影响
         * @param msg 待发送数据
         * @return {@link SendResult} 实体,来通知发送者发送状态等信息, 比如消息的ID
         * {@link SendStatus} 指明 broker 存储/复制 的状态, 发送到了哪个队列等等
         * @throws MQClientException 客户端异常
         * @throws RemotingException 网络连接异常
         * @throws MQBrokerException broker异常
         * @throws InterruptedException 发送线程中断异常
         */
        @Override
        public SendResult send(
            Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            return this.defaultMQProducerImpl.send(msg);
        }
    

    2、同步发送,带超时时间

    /**
         * 与 {@link #send(Message)} 相同,只不过多了超时时间的指定.
         *
         * @param msg 待发送消息
         * @param timeout 发送超时时间
         * @return {@link SendResult} 同上
         * {@link SendStatus} 同上
         * @throws MQClientException 客户端异常
         * @throws RemotingException 网络连接异常
         * @throws MQBrokerException broker异常
         * @throws InterruptedException 发送线程中断异常
         */
        @Override
        public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            return this.defaultMQProducerImpl.send(msg, timeout);
        }
    

    3、异步发送

    /**
         * 异步发送消息
         * 消息发送后,立即返回。broker处理返程后, 触发sendCallback回调方法
         * 与上面一样,在给出发送失败标志前,会尝试2次,所以开发者要处理重复发送带来的问题
         * @param msg 待发送消息
         * @param sendCallback 回调函数
         * @throws MQClientException 客户端异常
         * @throws RemotingException 网络异常
         * @throws InterruptedException 发送线程中断异常
         */
        @Override
        public void send(Message msg,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
            this.defaultMQProducerImpl.send(msg, sendCallback);
        }
    

    4、异步发送,带超时时间

    @Override
        public void send(Message msg, SendCallback sendCallback, long timeout)
            throws MQClientException, RemotingException, InterruptedException {
            this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
        }
    

    5、单向发送,不等待broker回馈

    /**
         * 发送方法不会等待broker的反馈,只会一直发
         * 所以有很高的吞吐量,但是有一定概率丢失消息
         *
         * @param msg 待发送消息
         * @throws MQClientException 客户端异常
         * @throws RemotingException 网络异常
         * @throws InterruptedException 发送线程中断异常
         */
        @Override
        public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
            this.defaultMQProducerImpl.sendOneway(msg);
        }
    

    6、同步发送,指定队列

    /**
         * 同步发送,指定队列
         * @param msg 待发送消息
         * @param mq 指定的消息队列
         * @return {@link SendResult} 同上
         * {@link SendStatus} 同上
         */
        @Override
        public SendResult send(Message msg, MessageQueue mq)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            return this.defaultMQProducerImpl.send(msg, mq);
        }
    

    7、同步发送,指定队列,并附带超时时间

    @Override
        public SendResult send(Message msg, MessageQueue mq, long timeout)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            return this.defaultMQProducerImpl.send(msg, mq, timeout);
        }
    

    8、异步发送,指定队列

    @Override
        public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
            throws MQClientException, RemotingException, InterruptedException {
            this.defaultMQProducerImpl.send(msg, mq, sendCallback);
        }
    

    9、异步发送,指定队列,附带超时时间

    这个在4.4.0版本被设置为废弃,后续版本会给出

    /**
      * 因为在处超时异常存在问题,所以废弃
     */
    @Override
        public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
            throws MQClientException, RemotingException, InterruptedException {
            this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
        }
    

    10、单向发送,指定队列

    @Override
        public void sendOneway(Message msg,
            MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
            this.defaultMQProducerImpl.sendOneway(msg, mq);
        }
    

    11、同步发送,指定队列选择策略

    官方的有序消息的DEMO就是基于队列选择器做的,让一些列有序的消息(相同ID)发送到同一个队列

    /**
         * 指定队列选择策略MessageQueueSelector 
         *
         * @param msg 待发送消息
         * @param selector 队列选择器
         * @param arg 配合队列选择器选择队列的参数,一般可以是业务参数(ID等)
         * @return {@link SendResult} 同上
         * {@link SendStatus} 同上
         */
    @Override
        public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            return this.defaultMQProducerImpl.send(msg, selector, arg);
        }
    

    12、同步发送消息,指定队列选择策略,并附带超时时间

    @Override
        public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            return this.defaultMQProducerImpl.send(msg, selector, arg, timeout);
        }
    

    13、异步发送消息,指定队列选择策略

    @Override
        public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
            throws MQClientException, RemotingException, InterruptedException {
            this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
        }
    

    14、异步发送消息,指定队列选择策略,并附带超时时间

    这个方法在4.4.0版本废弃,后续提供

    @Override
        public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
            throws MQClientException, RemotingException, InterruptedException {
            this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
        }
    

    15、单向发送,指定队列选择策略

     @Override
        public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
            throws MQClientException, RemotingException, InterruptedException {
            this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
        }
    

    三、TransactionMQProducer

    1、发送事务消息

    @Override
        public TransactionSendResult sendMessageInTransaction(final Message msg,
            final Object arg) throws MQClientException {
            if (null == this.transactionListener) {
                throw new MQClientException("TransactionListener is null", null);
            }
    
            return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
        }
    

    相关文章

      网友评论

        本文标题:四、RocketMQ-Producer的Send方法

        本文链接:https://www.haomeiwen.com/subject/bxgvgqtx.html