美文网首页RocketMQ
三、RocketMQ-Producer启动流程

三、RocketMQ-Producer启动流程

作者: ASD_92f7 | 来源:发表于2019-04-15 18:12 被阅读5次

一、UML图例

producer启动时序图

二、大致流程说明:

流程为非事务消息流程

  1. 在main方法中调用 new DefaultMQProducer(... ...).start(true);
  2. DefaultMQProducerImpl调用checkConfig()方法检查groupName等参数是否合法,然后调用MQClientManagergetAndCreateMQClientInstance()registerProducer()方法,实例化一个MQClientInstance并将new出来的DefaultMQProducer注册到MQClientInstance中(放到了producerTable这个map中)
  3. MQClientInstance中,依次调用
  • MQClientAPIImpl.start()
    启动 remotingClient,request-response channel,即通过netty与name server和broker连接
  • this.startScheduledTask()
    Start various schedule tasks ,启动定时任务,定时更新remotingClient的NameSrv的地址等配置信息
  • PullMessageServic.start()
    Start pull service,启动消息拉取客户端
  • RebalanceService.start()
    Start rebalance service,启动负载均衡服务
  • this.sendHeartbeatToAllBrokerWithLock()
    向broker发送心跳,并拉取配置更新remotingClient
  • DefaultMQProducerImpl.start(false)
    再次调用DefaultMQProducerImpl的start方法,不过不再进行初始化
    这里为什么这么写,后续理解了再进行补充,反正是连上后,发送了两次心跳
  • this.sendHeartbeatToAllBrokerWithLock()
    向broker发送心跳,并拉取配置更新remotingClient

三、深入MQClientInstance

1、mQClientAPIImpl.start()

image.png

先看源码:

  • 这货就新建了一个netty client
  • scheduleAtFixedRate启动了一个定时 :
    This method is periodically invoked to scan and expire deprecated request,大意是周期性地调用,用来扫描并让废弃的请求过期
  • this.nettyEventExecutor.start()
    启动netty event监听,暂时不太清除干啥使的,后续补充!
public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyClientConfig.getClientWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
                }
            });

        Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) {
                        if (null != sslContext) {
                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            log.info("Prepend SSL handler");
                        } else {
                            log.warn("Connections are insecure as SSLContext is null!");
                        }
                    }
                    pipeline.addLast(
                        defaultEventExecutorGroup,
                        new NettyEncoder(),
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                        new NettyConnectManageHandler(),
                        new NettyClientHandler());
                }
            });

        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    NettyRemotingClient.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
    }

四、MQClientInstance.startScheduledTask()

定义了一批定时任务:


定时任务
  • fetchNameServerAddr
    如果启动的时候没有指定Name Server的地址,则会尝试通过http等其他的方式获取
    周期:延迟10秒,2分钟一次
  • updateTopicRouteInfoFromNameServer
    从Name Server获取Topic及Route信息,并更新
    周期:延迟10毫秒,每30秒一次
  • cleanOfflineBroker,sendHeartbeatToAllBrokerWithLock
    清理以经下线的broker,发送心跳
    周期:延迟1秒,每30秒一次
  • persistAllConsumerOffset
    持久化 offset
    周期:延迟10秒,每5秒执行一次
  • adjustThreadPool
    调整线程池大小
    周期:延迟1分钟,每1分钟执行一次
    源码如下:
private void startScheduledTask() {
        if (null == this.clientConfig.getNamesrvAddr()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    } catch (Exception e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.cleanOfflineBroker();
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                } catch (Exception e) {
                    log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                }
            }
        }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.persistAllConsumerOffset();
                } catch (Exception e) {
                    log.error("ScheduledTask persistAllConsumerOffset exception", e);
                }
            }
        }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.adjustThreadPool();
                } catch (Exception e) {
                    log.error("ScheduledTask adjustThreadPool exception", e);
                }
            }
        }, 1, 1, TimeUnit.MINUTES);
    }

五、PullMessageServic.start()

启动拉取服务

相关文章

  • 三、RocketMQ-Producer启动流程

    一、UML图例 二、大致流程说明: 流程为非事务消息流程 在main方法中调用 new DefaultMQProd...

  • Android系统启动过程

    一、系统启动总流程 二、init进程启动流程 三、Zygote启动流程 四、SystemServer启动流程 五、...

  • activiti流程的启动

    流程启动启动之后:产生一个流程实例 流程启动涉及到的三张表

  • 性能优化 -- 启动优化

    启动优化 app启动分为:冷启动;热启动,温启动三种冷启动: 耗时最多流程:冷启动经过的流程点击app --> ...

  • 系统启动和内核管理

    一、Linux组成 二、CentOS6启动流程 三、启动流程 四、系统启动流程 init初始化 五、CentOS ...

  • Activiti工作流框架——HelloWorld

    一、部署流程定义 二、启动流程实例 三、查询当前个人任务 四、完成个人任务 一、部署流程定义 二、启动流程实例 三...

  • 16-系统启动和内核管理

    本章内容 Linux组成 CentOS6 启动流程 CentOS6启动流程 启动流程 启动流程 内核 启动流程 ​...

  • 性能优化:App启动优化

    一、App启动流程及启动优化二、定量监测App启动耗时、定位耗时代码三、果速送App启动优化 一、App启动流程及...

  • 从手机启动到View显示

    Android系统启动流程 Android系统启动流程: Activity启动流程 Activity启动流程: 触...

  • 对AMS中重要概念的理解

    不同启动流程 根据启动流程分为三种 通过Launcher点击app创建新进程,称为 根Activity组件的启动流...

网友评论

    本文标题:三、RocketMQ-Producer启动流程

    本文链接:https://www.haomeiwen.com/subject/jnlpwqtx.html