美文网首页RocketMQ
三、RocketMQ-Producer启动流程

三、RocketMQ-Producer启动流程

作者: ASD_92f7 | 来源:发表于2019-04-15 18:12 被阅读5次

    一、UML图例

    producer启动时序图

    二、大致流程说明:

    流程为非事务消息流程

    1. 在main方法中调用 new DefaultMQProducer(... ...).start(true);
    2. DefaultMQProducerImpl调用checkConfig()方法检查groupName等参数是否合法,然后调用MQClientManagergetAndCreateMQClientInstance()registerProducer()方法,实例化一个MQClientInstance并将new出来的DefaultMQProducer注册到MQClientInstance中(放到了producerTable这个map中)
    3. MQClientInstance中,依次调用
    • MQClientAPIImpl.start()
      启动 remotingClient,request-response channel,即通过netty与name server和broker连接
    • this.startScheduledTask()
      Start various schedule tasks ,启动定时任务,定时更新remotingClient的NameSrv的地址等配置信息
    • PullMessageServic.start()
      Start pull service,启动消息拉取客户端
    • RebalanceService.start()
      Start rebalance service,启动负载均衡服务
    • this.sendHeartbeatToAllBrokerWithLock()
      向broker发送心跳,并拉取配置更新remotingClient
    • DefaultMQProducerImpl.start(false)
      再次调用DefaultMQProducerImpl的start方法,不过不再进行初始化
      这里为什么这么写,后续理解了再进行补充,反正是连上后,发送了两次心跳
    • this.sendHeartbeatToAllBrokerWithLock()
      向broker发送心跳,并拉取配置更新remotingClient

    三、深入MQClientInstance

    1、mQClientAPIImpl.start()

    image.png

    先看源码:

    • 这货就新建了一个netty client
    • scheduleAtFixedRate启动了一个定时 :
      This method is periodically invoked to scan and expire deprecated request,大意是周期性地调用,用来扫描并让废弃的请求过期
    • this.nettyEventExecutor.start()
      启动netty event监听,暂时不太清除干啥使的,后续补充!
    public void start() {
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                nettyClientConfig.getClientWorkerThreads(),
                new ThreadFactory() {
    
                    private AtomicInteger threadIndex = new AtomicInteger(0);
    
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
                    }
                });
    
            Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
                .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
                .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        if (nettyClientConfig.isUseTLS()) {
                            if (null != sslContext) {
                                pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                                log.info("Prepend SSL handler");
                            } else {
                                log.warn("Connections are insecure as SSLContext is null!");
                            }
                        }
                        pipeline.addLast(
                            defaultEventExecutorGroup,
                            new NettyEncoder(),
                            new NettyDecoder(),
                            new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                            new NettyConnectManageHandler(),
                            new NettyClientHandler());
                    }
                });
    
            this.timer.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    try {
                        NettyRemotingClient.this.scanResponseTable();
                    } catch (Throwable e) {
                        log.error("scanResponseTable exception", e);
                    }
                }
            }, 1000 * 3, 1000);
    
            if (this.channelEventListener != null) {
                this.nettyEventExecutor.start();
            }
        }
    

    四、MQClientInstance.startScheduledTask()

    定义了一批定时任务:


    定时任务
    • fetchNameServerAddr
      如果启动的时候没有指定Name Server的地址,则会尝试通过http等其他的方式获取
      周期:延迟10秒,2分钟一次
    • updateTopicRouteInfoFromNameServer
      从Name Server获取Topic及Route信息,并更新
      周期:延迟10毫秒,每30秒一次
    • cleanOfflineBroker,sendHeartbeatToAllBrokerWithLock
      清理以经下线的broker,发送心跳
      周期:延迟1秒,每30秒一次
    • persistAllConsumerOffset
      持久化 offset
      周期:延迟10秒,每5秒执行一次
    • adjustThreadPool
      调整线程池大小
      周期:延迟1分钟,每1分钟执行一次
      源码如下:
    private void startScheduledTask() {
            if (null == this.clientConfig.getNamesrvAddr()) {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                        } catch (Exception e) {
                            log.error("ScheduledTask fetchNameServerAddr exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
            }
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                    } catch (Exception e) {
                        log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                    }
                }
            }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.cleanOfflineBroker();
                        MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                    } catch (Exception e) {
                        log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                    }
                }
            }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.persistAllConsumerOffset();
                    } catch (Exception e) {
                        log.error("ScheduledTask persistAllConsumerOffset exception", e);
                    }
                }
            }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.adjustThreadPool();
                    } catch (Exception e) {
                        log.error("ScheduledTask adjustThreadPool exception", e);
                    }
                }
            }, 1, 1, TimeUnit.MINUTES);
        }
    

    五、PullMessageServic.start()

    启动拉取服务

    相关文章

      网友评论

        本文标题:三、RocketMQ-Producer启动流程

        本文链接:https://www.haomeiwen.com/subject/jnlpwqtx.html