美文网首页
rocketmq源码8-客户端-MQClientInstance

rocketmq源码8-客户端-MQClientInstance

作者: modou1618 | 来源:发表于2019-01-30 07:40 被阅读0次

    一 MQClientManager

    • 单例模式
    private static MQClientManager instance = new MQClientManager();
    public static MQClientManager getInstance() {
        return instance;
    }
    
    • 根据id,创建并缓存对应的MQClientInstance
    ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable
    
    public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }
    
        return instance;
    }
    

    二 MQClientInstance属性及实例化

    • 使用当前MQClientInstance的生产者,消费者,管理者的纪录信息
    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
    
    • topic的路由信息
    ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
    
    public class TopicRouteData extends RemotingSerializable {
        private String orderTopicConf;//顺序消费的配置信息
        private List<QueueData> queueDatas;//topic在broker上的队列配置信息
        private List<BrokerData> brokerDatas;//topic所在broker的信息
        private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;//过滤server信息
    }
    
    public class QueueData implements Comparable<QueueData> {
        private String brokerName; //topc所在broker
        private int readQueueNums;//读队列数
        private int writeQueueNums;//写队列数
        private int perm;//读写配置
        private int topicSynFlag;//同步配置
    }
    
    public class BrokerData implements Comparable<BrokerData> {
        private String cluster;//broker的集群名称
        private String brokerName;//broker名称
        private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;//broker的主从节点地址
    }
    
    • 全量broker信息
    //broker主从节点地址表
    ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable
    //broker版本表
    ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable
    
    • 实例化
    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
    //配置
        this.clientConfig = clientConfig;
        this.instanceIndex = instanceIndex;
    //通信客户端配置
        this.nettyClientConfig = new NettyClientConfig();
     this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
        this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
    //请求码处理函数
        this.clientRemotingProcessor = new ClientRemotingProcessor(this);
    //组装请求,代理通信客户端的通信接口
        this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
        if (this.clientConfig.getNamesrvAddr() != null) {
    //更新namesrv地址列表        this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
            log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
        }
        this.clientId = clientId;
    //topic,队列,消息管理接口
        this.mQAdminImpl = new MQAdminImpl(this);
    //push方式中,异步线程处理拉消息请求。
        this.pullMessageService = new PullMessageService(this);
    //定时任务,调用消费端负载均衡服务。
        this.rebalanceService = new RebalanceService(this);
    //内部生产者topic,用于消费失败或超时的消息,sendMessageBack回发给broker,放大retry topic中重试消费
        this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
        this.defaultMQProducer.resetClientConfig(clientConfig);
    //统计服务
        this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
        log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
            this.instanceIndex,
            this.clientId,
            this.clientConfig,
            MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
    }
    

    三 MQClientInstance初始化

    • start()
    public void start() throws MQClientException {
    
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }
    

    四 RebalanceService

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
    
        while (!this.isStopped()) {
    //默认20s,使用CountDownLatch2休眠等待20s
            this.waitForRunning(waitInterval);
    //调用
            this.mqClientFactory.doRebalance();
        }
    
        log.info(this.getServiceName() + " service end");
    }
    //遍历所有消费客户端,执行DefaultMQPushConsumerImpl.doRebalance()。具体见前述章节
    public void doRebalance() {
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                try {
                    impl.doRebalance();
                } catch (Throwable e) {
                    log.error("doRebalance exception", e);
                }
            }
        }
    }
    

    五 PullMessageService

    • push方式消费端使用
    • 存储拉消息的请求,异步处理请求拉取消息
      LinkedBlockingQueue<PullRequest> pullRequestQueue
    • pullRequestQueue生产者1,消费端负载均衡更新完成后,发送PullRequest到pullRequestQueue队列中
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
        final boolean isOrder) {
    ...
    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    //遍历分配给topic的MessageQueue
    for (MessageQueue mq : mqSet) {
        if (!this.processQueueTable.containsKey(mq)) {
    //没有消费缓存快照的,则需要组装PullRequest,拉取消息放入缓存快照中
            if (isOrder && !this.lock(mq)) {
    //顺序topic需要先加锁
                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                continue;
            }
    
            this.removeDirtyOffset(mq);
            ProcessQueue pq = new ProcessQueue();
    //计算首次消费的偏移量
            long nextOffset = this.computePullFromWhere(mq);
            if (nextOffset >= 0) {
                ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                if (pre != null) {//防止并发
                    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                } else {
                    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                    PullRequest pullRequest = new PullRequest();
                    //消费组
                    pullRequest.setConsumerGroup(consumerGroup);
                    //消费偏移量
                    pullRequest.setNextOffset(nextOffset);
                    //消息队列
                    pullRequest.setMessageQueue(mq);
                    //消息缓存快照,拉取消息后本地缓存,等待业务方消费
                    pullRequest.setProcessQueue(pq);
                    pullRequestList.add(pullRequest);
                    changed = true;
                }
            } else {
                log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
            }
        }
    }
    
    this.dispatchPullRequest(pullRequestList);
    ...
    }
    
    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
        for (PullRequest pullRequest : pullRequestList) {
    //分发消息到阻塞队列中        this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
            log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
        }
    }
    
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
    
        while (!this.isStopped()) {
            try {
    //等待获取拉消息请求PullRequest
                PullRequest pullRequest = this.pullRequestQueue.take();
                if (pullRequest != null) {
                    this.pullMessage(pullRequest);
                }
            } catch (InterruptedException e) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }
    
        log.info(this.getServiceName() + " service end");
    }
    //获取消费组对应的消费客户端,执行pullMessage拉取消息。
    private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
    }
    

    六 MQAdminImpl

    • 创建topic
    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
        try {
            //从namesrv获取topic路由信息
            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis);
            //获取broker列表
            List<BrokerData> brokerDataList = topicRouteData.getBrokerDatas();
            if (brokerDataList != null && !brokerDataList.isEmpty()) {
                Collections.sort(brokerDataList);
                //至少一个broker创建成功
                boolean createOKAtLeastOnce = false;
                MQClientException exception = null;
                StringBuilder orderTopicString = new StringBuilder();
                //遍历broker
                for (BrokerData brokerData : brokerDataList) {
                    //取broker主节点地址
                    String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                    if (addr != null) {//初始化topic的配置
                        TopicConfig topicConfig = new TopicConfig(newTopic);
                        topicConfig.setReadQueueNums(queueNum);//读队列数
                        topicConfig.setWriteQueueNums(queueNum);//写队列数
                        topicConfig.setTopicSysFlag(topicSysFlag);//单元标记
    
                        boolean createOK = false;
                        for (int i = 0; i < 5; i++) {
                   //通知broker创建topic。失败最多尝试5次
                            try {                                        
                                this.mQClientFactory.getMQClientAPIImpl()
                                    .createTopic(addr, key, topicConfig, timeoutMillis);
                                createOK = true;
                                createOKAtLeastOnce = true;
                                break;
                            } catch (Exception e) {
                                if (4 == i) {
                                    exception = new MQClientException("create topic to broker exception", e);
                                }
                            }
                        }
    
                        if (createOK) {
                            orderTopicString.append(brokerData.getBrokerName());
                            orderTopicString.append(":");
                            orderTopicString.append(queueNum);
                            orderTopicString.append(";");
                        }
                    }
                }
                //有一个broker创建topic成功,即认为成功
                if (exception != null && !createOKAtLeastOnce) {
                    throw exception;
                }
            } else {
                throw new MQClientException("Not found broker, maybe key is wrong", null);
            }
        } catch (Exception e) {
            throw new MQClientException("create new topic failed", e);
        }
    }
    
    • 查询生产路由
    public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
        try {
            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
            if (topicRouteData != null) {
    //若顺序order配置,则route.getOrderTopicConf().split(";");获取broker列表,存储broker的mq
    //若非顺序order配置,则遍历route.getQueueDatas(),选择可写的broker,保存broker主节点上的mq
                TopicPublishInfo topicPublishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
                if (topicPublishInfo != null && topicPublishInfo.ok()) {
                    return topicPublishInfo.getMessageQueueList();
                }
            }
        } catch (Exception e) {
            throw new MQClientException("Can not find Message Queue for this topic, " + topic, e);
        }
    
        throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
    }
    
    • 查询消费路由
    public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
        try {
            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
            if (topicRouteData != null) {
    //遍历route.getQueueDatas(),选择可读的broker,保存mq信息
                Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                if (!mqList.isEmpty()) {
                    return mqList;
                } else {
                    throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);
                }
            }
        } catch (Exception e) {
            throw new MQClientException(
                "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
                e);
        }
    
        throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
    }
    
    • 查询偏移量
    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
    //获取队列所属broker主节点地址
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
     this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }
    
        if (brokerAddr != null) {
            try { //发送请求获取指定时间戳的偏移量数据     
                return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
                    timeoutMillis);
            } catch (Exception e) {
                throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
            }
        }
    
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
    
    • 查询消息详情
    protected QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end,
        boolean isUniqKey) throws MQClientException,
        InterruptedException {
     //获取topic路由信息
        TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
        if (null == topicRouteData) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
        }
    
        if (topicRouteData != null) {
    //遍历topic所在broker列表,获取broker地址
            List<String> brokerAddrs = new LinkedList<String>();
            for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
                String addr = brokerData.selectBrokerAddr();
                if (addr != null) {
                    brokerAddrs.add(addr);
                }
            }
    
            if (!brokerAddrs.isEmpty()) {
    //等待每个broker查询结果返回
                final CountDownLatch countDownLatch = new CountDownLatch(brokerAddrs.size());
    //存储查询结果
                final List<QueryResult> queryResultList = new LinkedList<QueryResult>();
    //控制对queryResultList的并发修改
                final ReadWriteLock lock = new ReentrantReadWriteLock(false);
    //遍历broker地址,发送异步查询消息,查询topic指定key在时间区间内的消息
                for (String addr : brokerAddrs) {
                    try {
                        QueryMessageRequestHeader requestHeader = new QueryMessageRequestHeader();
                        requestHeader.setTopic(topic);
                        requestHeader.setKey(key);
                        requestHeader.setMaxNum(maxNum);
                        requestHeader.setBeginTimestamp(begin);
                        requestHeader.setEndTimestamp(end);
    
                        this.mQClientFactory.getMQClientAPIImpl().queryMessage(addr, requestHeader, timeoutMillis * 3,
                            new InvokeCallback() {//异步查询响应回调函数
                                @Override
                                public void operationComplete(ResponseFuture responseFuture) {
                                    try {
                                        RemotingCommand response = responseFuture.getResponseCommand();
                                        if (response != null) {
                                            switch (response.getCode()) {
                                                case ResponseCode.SUCCESS: {
                                                    QueryMessageResponseHeader responseHeader = null;
                                                    try {
                                                        responseHeader =
                                                            (QueryMessageResponseHeader) response
                                                                .decodeCommandCustomHeader(QueryMessageResponseHeader.class);
                                                    } catch (RemotingCommandException e) {
                                                        log.error("decodeCommandCustomHeader exception", e);
                                                        return;
                                                    }
    //解码查询到的消息列表
                                                    List<MessageExt> wrappers =
                                                        MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);
    
                                                    QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);
                                                    try {//加锁保存
                                                        lock.writeLock().lock();
                                                        queryResultList.add(qr);
                                                    } finally {
                                                        lock.writeLock().unlock();
                                                    }
                                                    break;
                                                }
                                                default:
                                                    log.warn("getResponseCommand failed, {} {}", response.getCode(), response.getRemark());
                                                    break;
                                            }
                                        } else {
                                            log.warn("getResponseCommand return null");
                                        }
                                    } finally {//通知一次异步查询完成
                                        countDownLatch.countDown();
                                    }
                                }
                            }, isUniqKey);
                    } catch (Exception e) {
                        log.warn("queryMessage exception", e);
                    }
    
                }
    //等待所有broker异步查询完成
                boolean ok = countDownLatch.await(timeoutMillis * 4, TimeUnit.MILLISECONDS);
                if (!ok) {
                    log.warn("queryMessage, maybe some broker failed");
                }
    
                long indexLastUpdateTimestamp = 0;
                List<MessageExt> messageList = new LinkedList<MessageExt>();
                for (QueryResult qr : queryResultList) {
                    if (qr.getIndexLastUpdateTimestamp() > indexLastUpdateTimestamp) {
                        indexLastUpdateTimestamp = qr.getIndexLastUpdateTimestamp();
                    }
    
                    for (MessageExt msgExt : qr.getMessageList()) {
                        if (isUniqKey) {//唯一key,则返回最新存储时间的broker上消息
                            if (msgExt.getMsgId().equals(key)) {
    
                                if (messageList.size() > 0) {
    
                                    if (messageList.get(0).getStoreTimestamp() > msgExt.getStoreTimestamp()) {
    
                                        messageList.clear();
                                        messageList.add(msgExt);
                                    }
    
                                } else {
    
                                    messageList.add(msgExt);
                                }
                            } else {
                                log.warn("queryMessage by uniqKey, find message key not matched, maybe hash duplicate {}", msgExt.toString());
                            }
                        } else {//一组key,则包含key的消息即保存
                            String keys = msgExt.getKeys();
                            if (keys != null) {
                                boolean matched = false;
                                String[] keyArray = keys.split(MessageConst.KEY_SEPARATOR);
                                if (keyArray != null) {
                                    for (String k : keyArray) {
                                        if (key.equals(k)) {
                                            matched = true;
                                            break;
                                        }
                                    }
                                }
    
                                if (matched) {
                                    messageList.add(msgExt);
                                } else {
                                    log.warn("queryMessage, find message key not matched, maybe hash duplicate {}", msgExt.toString());
                                }
                            }
                        }
                    }
                }
    
                if (!messageList.isEmpty()) {
                    return new QueryResult(indexLastUpdateTimestamp, messageList);
                } else {
                    throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by key finished, but no message.");
                }
            }
        }
    
        throw new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "The topic[" + topic + "] not matched route info");
    }
    

    七 MQClientAPIImpl

    7.1 请求码处理函数ClientRemotingProcessor

    7.1 请求码处理分发函数

    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.CHECK_TRANSACTION_STATE:
    //校验事务消息生产者的事务状态,使用事务校验线程异步处理,
    //调用事务校验回调函数检查事务状态,根据本地事务状态发送事务状态对应的处理方式消息给broker
                return this.checkTransactionState(ctx, request);
            case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
                return this.notifyConsumerIdsChanged(ctx, request);
            case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
                return this.resetOffset(ctx, request);
            case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
                return this.getConsumeStatus(ctx, request);
    
            case RequestCode.GET_CONSUMER_RUNNING_INFO:
                return this.getConsumerRunningInfo(ctx, request);
    
            case RequestCode.CONSUME_MESSAGE_DIRECTLY:
                return this.consumeMessageDirectly(ctx, request);
            default:
                break;
        }
        return null;
    }
    

    7.2 代理接口

    • 和namesrv及broker通信的接口都从这里代理.

    7.2.1 发送消息

    • 组装发送消息体,支持批量消息或单个消息
    RemotingCommand request = null;
            if (sendSmartMsg || msg instanceof MessageBatch) {
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
            } else {
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
            }
    
            request.setBody(msg.getBody());
    
    • 支持异步,同步,单向发送消息接口
    switch (communicationMode) {
                case ONEWAY:
                    this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                    return null;
                case ASYNC:
                    final AtomicInteger times = new AtomicInteger();
                    this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, context, producer);
                    return null;
                case SYNC:
                    return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
                default:
                    assert false;
                    break;
            }
    
    • 异步发送消息
    private void sendMessageAsync(
            final String addr,
            final String brokerName,
            final Message msg,
            final long timeoutMillis,
            final RemotingCommand request,
            final SendCallback sendCallback,
            final TopicPublishInfo topicPublishInfo,
            final MQClientInstance instance,
            final int retryTimesWhenSendFailed,
            final AtomicInteger times,
            final SendMessageContext context,
            final DefaultMQProducerImpl producer
        ) throws InterruptedException, RemotingException {
            this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {//构建异步请求响应回调函数
                @Override
                public void operationComplete(ResponseFuture responseFuture) {
                    RemotingCommand response = responseFuture.getResponseCommand();
                    if (null == sendCallback && response != null) {
    //无用户回调函数,有响应消息
                        try {//处理响应报文
                            SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                            if (context != null && sendResult != null) {
                                context.setSendResult(sendResult);
                                context.getProducer().executeSendMessageHookAfter(context);
                            }
                        } catch (Throwable e) {
                        }
    //更新broker抑制时间
                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                        return;
                    }
    
                    if (response != null) {
                        try {//有响应回调函数,则调用用户回调函数接口。
                            SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                            assert sendResult != null;
                            if (context != null) {
                                context.setSendResult(sendResult);
                                context.getProducer().executeSendMessageHookAfter(context);
                            }
    
                            try {
                                sendCallback.onSuccess(sendResult);
                            } catch (Throwable e) {
                            }
    
                            producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                        } catch (Exception e) {
                            producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                            onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                                retryTimesWhenSendFailed, times, e, context, false, producer);
                        }
                    } else {
                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                        if (!responseFuture.isSendRequestOK()) {
                            MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
                            onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                                retryTimesWhenSendFailed, times, ex, context, true, producer);
                        } else if (responseFuture.isTimeout()) {
                            MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
                                responseFuture.getCause());
                            onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                                retryTimesWhenSendFailed, times, ex, context, true, producer);
                        } else {
                            MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
                            onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                                retryTimesWhenSendFailed, times, ex, context, true, producer);
                        }
                    }
                }
            });
        }
    
    • 处理响应,解析响应报文
    • 处理异常
    private void onExceptionImpl(final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int timesTotal,
        final AtomicInteger curTimes,
        final Exception e,
        final SendMessageContext context,
        final boolean needRetry,
        final DefaultMQProducerImpl producer
    ) {
        int tmp = curTimes.incrementAndGet();
        if (needRetry && tmp <= timesTotal) {//发送重试
            String retryBrokerName = brokerName;//by default, it will send to the same broker
            if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
                MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
                retryBrokerName = mqChosen.getBrokerName();
            }
            String addr = instance.findBrokerAddressInPublish(retryBrokerName);
            log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
                retryBrokerName);
            try {
            //更新请求唯一id    request.setOpaque(RemotingCommand.createNewRequestId());
    //再次发送异步消息
                sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
                    timesTotal, curTimes, context, producer);
            } catch (InterruptedException e1) {
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, false, producer);
            } catch (RemotingConnectException e1) {
                producer.updateFaultItem(brokerName, 3000, true);
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, true, producer);
            } catch (RemotingTooMuchRequestException e1) {
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, false, producer);
            } catch (RemotingException e1) {
                producer.updateFaultItem(brokerName, 3000, true);
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, true, producer);
            }
        } else {
    //不重试则处理异常结果
            if (context != null) {
                context.setException(e);
                context.getProducer().executeSendMessageHookAfter(context);
            }
    
            try {//调用回调函数异常处理函数
                sendCallback.onException(e);
            } catch (Exception ignored) {
            }
        }
    }
    

    7.2.2 拉取消息

    • 同步拉取消息
    private PullResult pullMessageSync(
            final String addr,
            final RemotingCommand request,
            final long timeoutMillis
        ) throws RemotingException, InterruptedException, MQBrokerException {
            RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
            assert response != null;
    //根据结果转换状态码,返回拉取的消息
            return this.processPullResponse(response);
        }
    
    • 异步拉取消息
    private void pullMessageAsync(
        final String addr,
        final RemotingCommand request,
        final long timeoutMillis,
        final PullCallback pullCallback
    ) throws RemotingException, InterruptedException {
        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {//封装一层异步回调函数
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                RemotingCommand response = responseFuture.getResponseCommand();
                if (response != null) {
                    try {//处理响应结果
                        PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
                        assert pullResult != null;
    //调用上层的拉消息异步回调,push方式的消息快照缓存处理
                        pullCallback.onSuccess(pullResult);
                    } catch (Exception e) {
                        pullCallback.onException(e);
                    }
                } else {//无响应,调用异常回调函数处理
                    if (!responseFuture.isSendRequestOK()) {
                        pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
                    } else if (responseFuture.isTimeout()) {
                        pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
                            responseFuture.getCause()));
                    } else {
                        pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
                    }
                }
            }
        });
    }
    

    相关文章

      网友评论

          本文标题:rocketmq源码8-客户端-MQClientInstance

          本文链接:https://www.haomeiwen.com/subject/dguzjqtx.html