美文网首页RocketMQ
六、RocketMQ-Producer-Send深入一丢丢

六、RocketMQ-Producer-Send深入一丢丢

作者: ASD_92f7 | 来源:发表于2019-04-25 16:54 被阅读10次

    一、概述

    本篇主要跟踪下producer的发送流程,以SYNC同步模式为例,假定producer已经start了

    二、主线流程图

    主线流程图

    三、流程深入一丢丢

    再次说明下,这里是以SYNC为例子
    只对逻辑相对较多的几个方法做讲解

    方法2:

    增加了一个timeout,发送超时时间,默认时间 3 秒

    SendResult send(Message msg,long timeout)
    

    方法3:sendDefaultImpl()

    方法流程图
    方法内,会根据策略获取待发送的队列,然后调用sendKernelImpl发送消息,如果发送失败,会尝试 1 + 重试次数(默认为2) = 3次

    方法4 sendKernelImpl()

    sendKernelImpl
    • 首先为消息添加主键,格式如下:
      UNIQ_KEY : 0BCDF1716BEC18B4AAC27F26B89A0000
    • 压缩消息
    • 执行hookbefore方法(如果有的话)
    • 组织requestHeader作为下个方法的参数

    方法6 invokeSync

    这个方法在调用 invokeSyncImpl 的前后,分别调用了doBeforeRpcHooksdoAfterRpcHooks的hooks方法,切入RPC调用

    方法7 invokeSyncImpl

    这个是最终和broker通讯的代码,通过netty的channel.writeAndFlush(request)方法将消息发送给broker,并通过ChannelFutureListener回调函数获取broker的反馈
    通过下面的代码让阻塞线程,其实内部就是一个length=1的CountDownLatch

    RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
    

    然后在ChannelFutureListener回调函数的putResponse方法中释放,latch - 1,保证获取到回馈再返回
    具体的源代码如下:

    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
            final long timeoutMillis)
            throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
            final int opaque = request.getOpaque();
    
            try {
                final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
                this.responseTable.put(opaque, responseFuture);
                final SocketAddress addr = channel.remoteAddress();
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                            responseFuture.setSendRequestOK(true);
                            return;
                        } else {
                            responseFuture.setSendRequestOK(false);
                        }
    
                        responseTable.remove(opaque);
                        responseFuture.setCause(f.cause());
                        responseFuture.putResponse(null);
                        log.warn("send a request command to channel <" + addr + "> failed.");
                    }
                });
                // 在这里阻塞 等待响应
                RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
                if (null == responseCommand) {
                    if (responseFuture.isSendRequestOK()) {
                        throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                            responseFuture.getCause());
                    } else {
                        throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                    }
                }
    
                return responseCommand;
            } finally {
                this.responseTable.remove(opaque);
            }
        }
    

    相关文章

      网友评论

        本文标题:六、RocketMQ-Producer-Send深入一丢丢

        本文链接:https://www.haomeiwen.com/subject/yteagqtx.html