美文网首页
rocketmq源码5-客户端-生产者

rocketmq源码5-客户端-生产者

作者: modou1618 | 来源:发表于2019-01-27 10:38 被阅读0次

    一 DefaultMQProducer

    • 生产客户端标识producerGroup,事务消息中同一group的生产实例作为一个整体处理
    • 默认topic
    String createTopicKey = MixAll.DEFAULT_TOPIC;
    String DEFAULT_TOPIC = "TBW102";
    
    • topic创建的队列数量int defaultTopicQueueNums=4
    • 发包超时时间int sendMsgTimeout = 3000;
    • 压缩阈值,超过则压缩报文int compressMsgBodyOverHowmuch = 1024 * 4;
    • 同步发送失败,重试次数int retryTimesWhenSendFailed = 2;
    • 异步发送失败,重试次数int retryTimesWhenSendAsyncFailed = 2;
    • 发送失败时,是否更换brokerboolean retryAnotherBrokerWhenNotStoreOK = false;
    • 报文长度限制int maxMessageSize = 1024 * 1024 * 4;

    二 DefaultMQProducerImpl

    • 生产配置DefaultMQProducer defaultMQProducer;
    • 客户端状态ServiceState serviceState = ServiceState.CREATE_JUST;
    • 下层代理的客户端MQClientInstance mQClientFactory;
    • 发送请求和收包钩子RPCHook rpcHook;
    • 报文压缩级别,报文长度超过压缩阈值时,按压缩级别进行压缩。
    int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
    
    private boolean tryToCompressMessage(final Message msg) {
        if (msg instanceof MessageBatch) {
            //batch dose not support compressing right now
            return false;
        }
        byte[] body = msg.getBody();
        if (body != null) {
            if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
                try {
                    byte[] data = UtilAll.compress(body, zipCompressLevel);
                    if (data != null) {
                        msg.setBody(data);
                        return true;
                    }
                } catch (IOException e) {
                    log.error("tryToCompressMessage exception", e);
                    log.warn(msg.toString());
                }
            }
        }
    
        return false;
    }
    
    • 禁止发包校验钩子函数CheckForbiddenHook
    ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
    
    • 发包前调用禁止发包校验钩子函数CheckForbiddenHook
    if (hasCheckForbiddenHook()) {
        CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
        checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
        checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
        checkForbiddenContext.setCommunicationMode(communicationMode);
        checkForbiddenContext.setBrokerAddr(brokerAddr);
        checkForbiddenContext.setMessage(msg);
        checkForbiddenContext.setMq(mq);
        checkForbiddenContext.setUnitMode(this.isUnitMode());
        this.executeCheckForbiddenHook(checkForbiddenContext);
    }
    
    • 发包前后钩子函数SendMessageHook
    ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
    
    public interface SendMessageHook {
        String hookName();
    
        void sendMessageBefore(final SendMessageContext context);
    
        void sendMessageAfter(final SendMessageContext context);
    }
    
    • 发包前钩子函数调用SendMessageHook.executeSendMessageHookBefore
    if (this.hasSendMessageHook()) {
        context = new SendMessageContext();
        context.setProducer(this);
        context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        context.setCommunicationMode(communicationMode);
        context.setBornHost(this.defaultMQProducer.getClientIP());
        context.setBrokerAddr(brokerAddr);
        context.setMessage(msg);
        context.setMq(mq);
        String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (isTrans != null && isTrans.equals("true")) {
            context.setMsgType(MessageType.Trans_Msg_Half);
        }
    
        if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
            context.setMsgType(MessageType.Delay_Msg);
        }
        this.executeSendMessageHookBefore(context);
    }
    
    • 发包后或发包产生异常时钩子函数调用SendMessageHook.executeSendMessageHookBefore
    if (this.hasSendMessageHook()) {
        context.setSendResult(sendResult);
        this.executeSendMessageHookAfter(context);
    }
    
    if (this.hasSendMessageHook()) {
        context.setException(e);
        this.executeSendMessageHookAfter(context);
    }
    
    • 事务校验线程池,见#三 TransactionMQProducer
    BlockingQueue<Runnable> checkRequestQueue;
    ExecutorService checkExecutor;
    
    • 事务校验线程池执行的事务校验任务。获取事务校验回调接口,执行事务校验,获取校验结果。根据校验结果进行返回响应。
    public void run() {
        TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
        if (transactionCheckListener != null) {
            LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
            Throwable exception = null;
            try {
                localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
            } catch (Throwable e) {
                log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
                exception = e;
            }
    
            this.processTransactionState(
                localTransactionState,
                group,
                exception);
        } else {
            log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
        }
    }
    
    • 错误处理策略MQFaultStrategy mqFaultStrategy
    • 存储topic的存储的mq列表信息。客户端从namesrv获取并更新topic路由信息时更新。
      ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable

    三 TransactionMQProducer

    • 事务消息生产者
    • 事务校验线程池核心线程数量int checkThreadPoolMinSize = 1;
    • 事务校验线程池最大线程数量int checkThreadPoolMaxSize = 1
    • 事务校验线程池等待任务最大容量int checkRequestHoldMax = 2000;
    • 初始化事务校验线程池
    public void start() throws MQClientException {
        this.defaultMQProducerImpl.initTransactionEnv();
        super.start();
    }
    
    public void initTransactionEnv() {
        TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
        this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
        this.checkExecutor = new ThreadPoolExecutor(
            producer.getCheckThreadPoolMinSize(),
            producer.getCheckThreadPoolMaxSize(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.checkRequestQueue);
    }
    
    • 事务校验回调函数,校验事务消息,返回校验结果LocalTransactionState。
      事务校验线程池中调用的事务校验任务中调用。
    TransactionCheckListener transactionCheckListener;
    
    public interface TransactionCheckListener {
        LocalTransactionState checkLocalTransactionState(final MessageExt msg);
    }
    
    public enum LocalTransactionState {
        COMMIT_MESSAGE,//事务提交
        ROLLBACK_MESSAGE,//事务回滚
        UNKNOW,
    }
    

    四 MQFaultStrategy

    • sendLatencyFaultEnable为true表示brokerName发送失败或超时后,需要等一段时间才能继续往该brokerName发送
      boolean sendLatencyFaultEnable = false;
    • latencyMax超时时间,notAvailableDuration表示broker暂停时间。使用computeNotAvailableDuration基于超时时间计算broker暂停时间。
      如: 服务超时时间在550-1000ms之间,则broker暂停30000ms。
      服务超时时间在50-550ms之间,则broker暂停0ms。
    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
    
    private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }
    
        return 0;
    }
    
    • 暂停服务的broker存储结构
      LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

    4.4 LatencyFaultToleranceImpl

    • 存储暂停服务的broker
      ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
    class FaultItem implements Comparable<FaultItem> {
        private final String name;//broker名称
        private volatile long currentLatency;//超时时间
        private volatile long startTimestamp;//暂停服务时间
    }
    
    • 通过FaultItem.compareTo()选择最优的broker
    public int compareTo(final FaultItem other) {
    //有效的更优,即暂停服务时间已经过期
        if (this.isAvailable() != other.isAvailable()) {
            if (this.isAvailable())
                return -1;
    
            if (other.isAvailable())
                return 1;
        }
    //超时时间短的更优
        if (this.currentLatency < other.currentLatency)
            return -1;
        else if (this.currentLatency > other.currentLatency) {
            return 1;
        }
    //暂停服务时间短的更优
        if (this.startTimestamp < other.startTimestamp)
            return -1;
        else if (this.startTimestamp > other.startTimestamp) {
            return 1;
        }
    
        return 0;
    }
    
    public boolean isAvailable() {
        return (System.currentTimeMillis() - startTimestamp) >= 0;
    }
    

    五 发包路由信息TopicPublishInfo

    public class TopicPublishInfo {
        private boolean orderTopic = false;//是否为顺序消息topic
        private boolean haveTopicRouterInfo = false;//
    //可用的mq
        private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    //当前使用的mq的索引
        private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
        private TopicRouteData topicRouteData;
    }
    
    • 轮询方式选择发送的mq
    public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
    
    • 排除上次发送失败的lastBrokerName后,轮询选择发送的mq
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
    

    五 消息生产

    5.1 普通消息

    • sendDefaultImpl()
    private SendResult sendDefaultImpl(
        Message msg,//消息
        final CommunicationMode communicationMode,//通信模式
        final SendCallback sendCallback,//异步回调
        final long timeout//超时时间
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
    
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
    //根据topic获取topic发送路由信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
    //同步发送有多次失败重试,异步只发送一次
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
    //纪录本次发送的broker,成功则优先选择上次发送的broker傻姑娘的mq,失败则下次重试发送时避开
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
    //根据失败抑制策略,选择一个broker上的MessageQueue
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
    //发送
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                        endTimestamp = System.currentTimeMillis();
    //根据发送消息耗时,计算本次目标broker的抑制时间区间,抑制时间内不再向该broker发送消息。
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
    //同步模式,若支持发送失败重试,则重试发送
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }
    
                                return sendResult;
                            default:
                                break;
                        }
    //异常,则对broker进行失败抑制
    //中断异常,则对broker进行超时抑制
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }
    
                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
    
                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }
    
            if (sendResult != null) {
                return sendResult;
            }
    
            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));
    
            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
    //根据异常,设置返回错误码
            MQClientException mqClientException = new MQClientException(info, exception);
            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
            }
    
            throw mqClientException;
        }
    //无namesrv配置错误
        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if (null == nsList || nsList.isEmpty()) {
            throw new MQClientException(
                "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
        }
    //无消息所属目标topic路由配置信息
        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }
    

    5.2 顺序生产

    • sendSelectImpl(),指定mq选择器,每次发送到固定的mq
    private SendResult sendSelectImpl(
        Message msg,
        MessageQueueSelector selector,//发送的MessageQueue选择器
        Object arg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
    
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            try {
    //选择器选择目标消息队列
                mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
            } catch (Throwable e) {
                throw new MQClientException("select message queue throwed exception.", e);
            }
    //发送消息
            if (mq != null) {
                return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);
            } else {
                throw new MQClientException("select message queue return null.", null);
            }
        }
    //无topic路由信息异常
        throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
    }
    

    5.2.1 随机mq选择器

    public class SelectMessageQueueByRandom implements MessageQueueSelector {
        private Random random = new Random(System.currentTimeMillis());
    
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            int value = random.nextInt(mqs.size());
            return mqs.get(value);
        }
    }
    

    5.2.2 基于hash的mq选择器

    public class SelectMessageQueueByHash implements MessageQueueSelector {
    
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            int value = arg.hashCode();
            if (value < 0) {
                value = Math.abs(value);
            }
    
            value = value % mqs.size();
            return mqs.get(value);
        }
    }
    

    5.3 实际发送接口

    • sendKernelImpl()
    private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //从topic路由信息中根据broker名字获取broker地址
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
    //更新topic路由信息,重新获取broker地址
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }
    
        SendMessageContext context = null;
        if (brokerAddr != null) {
    //vip使能则端口号-2获取实际端口,否则是原端口号
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
    
            byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg);
                }
                int sysFlag = 0;
    //消息压缩标记
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                }
    //事务消息预处理标记
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }
    //钩子函数
     ...
    //发送消息头
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());           requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);          requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    //重试topic
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {                    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }
                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {                  requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }
    //客户端接口发送消息
                SendResult sendResult = null;
                switch (communicationMode) {
                    case ASYNC:
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }
    
                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }
    
                return sendResult;
            } catch (RemotingException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (MQBrokerException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (InterruptedException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } finally {
                msg.setBody(prevBody);
            }
        }
    
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
    

    5.4 事务消息

    • sendMessageInTransaction()
    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter tranExecuter, final Object arg)
        throws MQClientException {
        if (null == tranExecuter) {
            throw new MQClientException("tranExecutor is null", null);
        }
        Validators.checkMessage(msg, this.defaultMQProducer);
    //标记事务消息
        SendResult sendResult = null;
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {//发送事务消息
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }
    
        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
    //本地事务处理
                    localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }
    
                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
    //失败则设置事务状态为回滚
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }
    
        try {//本地事务处理结果发送
            this.endTransaction(sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }
    //返回事务消息结果
        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }
    

    5.4.1 endTransaction

    public void endTransaction(
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        final MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }
        String transactionId = sendResult.getTransactionId();
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
           //事务提交     requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
      //事务回滚          requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
      //未知类型          requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }
    
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    //发送事务结果消息到broker    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }
    

    相关文章

      网友评论

          本文标题:rocketmq源码5-客户端-生产者

          本文链接:https://www.haomeiwen.com/subject/qpjgjqtx.html