美文网首页RocketMQ
六、RocketMQ-Producer-Send深入一丢丢

六、RocketMQ-Producer-Send深入一丢丢

作者: ASD_92f7 | 来源:发表于2019-04-25 16:54 被阅读10次

一、概述

本篇主要跟踪下producer的发送流程,以SYNC同步模式为例,假定producer已经start了

二、主线流程图

主线流程图

三、流程深入一丢丢

再次说明下,这里是以SYNC为例子
只对逻辑相对较多的几个方法做讲解

方法2:

增加了一个timeout,发送超时时间,默认时间 3 秒

SendResult send(Message msg,long timeout)

方法3:sendDefaultImpl()

方法流程图
方法内,会根据策略获取待发送的队列,然后调用sendKernelImpl发送消息,如果发送失败,会尝试 1 + 重试次数(默认为2) = 3次

方法4 sendKernelImpl()

sendKernelImpl
  • 首先为消息添加主键,格式如下:
    UNIQ_KEY : 0BCDF1716BEC18B4AAC27F26B89A0000
  • 压缩消息
  • 执行hookbefore方法(如果有的话)
  • 组织requestHeader作为下个方法的参数

方法6 invokeSync

这个方法在调用 invokeSyncImpl 的前后,分别调用了doBeforeRpcHooksdoAfterRpcHooks的hooks方法,切入RPC调用

方法7 invokeSyncImpl

这个是最终和broker通讯的代码,通过netty的channel.writeAndFlush(request)方法将消息发送给broker,并通过ChannelFutureListener回调函数获取broker的反馈
通过下面的代码让阻塞线程,其实内部就是一个length=1的CountDownLatch

RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);

然后在ChannelFutureListener回调函数的putResponse方法中释放,latch - 1,保证获取到回馈再返回
具体的源代码如下:

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();

        try {
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }

                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });
            // 在这里阻塞 等待响应
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }

相关文章

  • 六、RocketMQ-Producer-Send深入一丢丢

    一、概述 本篇主要跟踪下producer的发送流程,以SYNC同步模式为例,假定producer已经start了 ...

  • 丢丢丢丢

    大家好,我叫丢丢木 2017年9月15日,阳光明媚,安静的时间里。简单的:“大家好,我今天结婚,欢迎朋友来我家玩”...

  • 丢丢(一)

    一级标题 二级标题 三级标题 四级标题 五级标题 背景,斜体,加粗 第一条 第二条 第三条 第一条第一个分条第二个...

  • 一丢丢

    总是在深夜把思念狠命得抛向星空 想让星月再亮一丢丢 只要刚刚好能照到你的双眸 白天的疲惫不堪总是钻进我的梦 想让我...

  • 丢丢丢

    最近,我学了一堂课《断舍离》讲的内容大概是不要做不丢男,不丢女!我和家人几乎都是属于这种类型!只知道买买买,却不...

  • 丢丢丢

    这个时候什么都不如老滕的一个拥抱来的痛快吧,风吹日晒了一天,见不到想见的人,两天了,还有多少个这样的日子

  • 丢丢丢~

    有些事情必须记录下来,只有这样,在今天还是遥远到被遗忘的明天,都可以翻出来以确凿证据的方式登场,让当事人当庭认罪,...

  • 丢!丢!丢!

    年终了。 收拾,整理屋子,是每个窝必须要做的事。那是中国过年的年前传统例牌。洗洗刷刷,这样才有要过年的感觉。 阳光...

  • 丢丢丢

    朱小四 唐心阳同学今日分享:两个人上山捡石头,返回时,一个人只拿一块自己最喜欢的石头,另一个人背了一大包石头。最终...

  • 丢丢丢,丢掉( ˘͈ ᵕ ˘͈ )

    对于很多人来说,最为难的事,就是把曾经很有意思,将来可能很有用,说不定还能用的东西处理掉。 我爷爷的观点是,世界上...

网友评论

    本文标题:六、RocketMQ-Producer-Send深入一丢丢

    本文链接:https://www.haomeiwen.com/subject/yteagqtx.html