美文网首页rocketMq理论与实践
RocketMQ producer 启动流程

RocketMQ producer 启动流程

作者: 晴天哥_王志 | 来源:发表于2020-05-02 18:22 被阅读0次

    系列

    开篇

    • 这个系列的主要目的是介绍RocketMq producer的原理和用法,在这个系列当中会介绍 producer的启动流程、producer的路由同步、producer的消息发送流程。

    • 这篇文章主要producer的启动流程,主要介绍producer启动的流程、各类client以及各类定时任务。

    producer启动流程

    producer启动流程
    • producer启动过程需要创建DefaultMQProducer对象,DefaultMQProducer内部DefaultMQProducerImpl对象。

    • DefaultMQProducer#start执行DefaultMQProducerImpl#start,DefaultMQProducerImpl的start过程会通过MQClientManager创建MQClientInstance

    • MQClientInstance#start是producer启动流程中的最核心流程,包括所有服务的启动以及定时任务的启动。

    • MQClientInstance#start执行mQClientAPIImpl.start(),负责NettyRemotingClient的启动。

    • MQClientInstance#start执行startScheduledTask,启动包括获取路由信息等定时任务。

    • startScheduledTask定时任务会执行fetchNameServerAddr、updateTopicRouteInfoFromNameServer、cleanOfflineBroker、sendHeartbeatToAllBrokerWithLock、persistAllConsumerOffset、adjustThreadPool等核心任务。

    • MQClientInstance#start执行pullMessageService.start(),负责PullMessageService的启动。

    • MQClientInstance#start执行rebalanceService.start(),负责RebalanceService的启动。

    • MQClientInstance作为producer的核心对象,负责了几乎所有模块的启动,在consumer侧同样会用到该核心对象,所以MQClientInstance的启动会启动一些consumer侧的服务。

    producer启动源码分析

    public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            // 1、创建producer对象
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.setNamesrvAddr("localhost:9876");
    
            // 2、启动producer对象
            producer.start();
    
            // 3、producer发送消息
            for (int i = 0; i < 1000; i++) {
                try {
                    Message msg = new Message("TopicTest" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                    );
    
                    SendResult sendResult = producer.send(msg);
    
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
    
            producer.shutdown();
        }
    }
    
    • producer的使用按照三步走:创建producer对象、启动producer对象、发送message,这里暂且值分析前两步。
    • new DefaultMQProducer("producer_group_name")负责创建producer对象。
    • producer.start()负责启动producer对象。

    DefaultMQProducer

    public class DefaultMQProducer extends ClientConfig implements MQProducer {
    
        protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
    
        public DefaultMQProducer(final String producerGroup) {
            this(null, producerGroup, null);
        }
    
        public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,
            final String customizedTraceTopic) {
            this.producerGroup = producerGroup;
            defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    
            if (enableMsgTrace) {
                try {
                    AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
                    dispatcher.setHostProducer(this.defaultMQProducerImpl);
                    traceDispatcher = dispatcher;
                    this.defaultMQProducerImpl.registerSendMessageHook(
                        new SendMessageTraceHookImpl(traceDispatcher));
                } catch (Throwable e) {
                    log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
                }
            }
        }
    }
    
    • DefaultMQProducer的构造函数中会创建DefaultMQProducerImpl对象。
    public class DefaultMQProducer extends ClientConfig implements MQProducer {
    
        protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
    
        public void start() throws MQClientException {
            this.setProducerGroup(withNamespace(this.producerGroup));
            // 启动defaultMQProducerImpl
            this.defaultMQProducerImpl.start();
            if (null != traceDispatcher) {
                try {
                    traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
                } catch (MQClientException e) {
                    log.warn("trace dispatcher start failed ", e);
                }
            }
        }
    }
    
    • DefaultMQProducer#start会执行defaultMQProducerImpl.start()
    public class DefaultMQProducerImpl implements MQProducerInner {
    
        private MQClientInstance mQClientFactory;
    
        public void start() throws MQClientException {
            this.start(true);
        }
    
        public void start(final boolean startFactory) throws MQClientException {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
    
                    this.checkConfig();
    
                    if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                        this.defaultMQProducer.changeInstanceNameToPID();
                    }
                    // 1、创建MQClientInstance对象
                    this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
    
                    boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                    if (!registerOK) {
                        this.serviceState = ServiceState.CREATE_JUST;
                        throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                            null);
                    }
    
                    this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
    
                    if (startFactory) {
                        // 2、启动MQClientInstance
                        mQClientFactory.start();
                    }
    
                    log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                        this.defaultMQProducer.isSendMessageWithVIPChannel());
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                case START_FAILED:
                case SHUTDOWN_ALREADY:
                    throw new MQClientException("The producer service state not OK, maybe started once, "
                        + this.serviceState
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                        null);
                default:
                    break;
            }
    
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    
            this.timer.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    try {
                        RequestFutureTable.scanExpiredRequest();
                    } catch (Throwable e) {
                        log.error("scan RequestFutureTable exception", e);
                    }
                }
            }, 1000 * 3, 1000);
        }
    }
    
    public class MQClientManager {
        private final static InternalLogger log = ClientLogger.getLog();
        private static MQClientManager instance = new MQClientManager();
        private AtomicInteger factoryIndexGenerator = new AtomicInteger();
        private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
            new ConcurrentHashMap<String, MQClientInstance>();
    
        public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
            String clientId = clientConfig.buildMQClientId();
            MQClientInstance instance = this.factoryTable.get(clientId);
            if (null == instance) {
                instance =
                    new MQClientInstance(clientConfig.cloneClientConfig(),
                        this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
                MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
                if (prev != null) {
                    instance = prev;
                    log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
                } else {
                    log.info("Created new MQClientInstance for clientId:[{}]", clientId);
                }
            }
    
            return instance;
        }
    }
    
    • MQClientManager.getInstance().getOrCreateMQClientInstance()负责创建MQClientInstance对象。
    • mQClientFactory.start()调用MQClientInstance.start()负责MQClientInstance的启动。
    public class MQClientInstance {
    
        public void start() throws MQClientException {
    
            synchronized (this) {
                switch (this.serviceState) {
                    case CREATE_JUST:
                        this.serviceState = ServiceState.START_FAILED;
                        // If not specified,looking address from name server
                        if (null == this.clientConfig.getNamesrvAddr()) {
                            this.mQClientAPIImpl.fetchNameServerAddr();
                        }
                        // Start request-response channel
                        this.mQClientAPIImpl.start();
                        // Start various schedule tasks
                        this.startScheduledTask();
                        // Start pull service
                        this.pullMessageService.start();
                        // Start rebalance service
                        this.rebalanceService.start();
                        // Start push service
                        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                        log.info("the client factory [{}] start OK", this.clientId);
                        this.serviceState = ServiceState.RUNNING;
                        break;
                    case START_FAILED:
                        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                    default:
                        break;
                }
            }
        }
    }
    
    • producer的mQClientAPIImpl.start()负责启动NettyClient用以通信。
    • producer的startScheduledTask()负责启动各类定时任务。
    public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
       
        private static final long LOCK_TIMEOUT_MILLIS = 3000;
        private final NettyClientConfig nettyClientConfig;
        private final Bootstrap bootstrap = new Bootstrap();
    
        public void start() {
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                nettyClientConfig.getClientWorkerThreads(),
                new ThreadFactory() {
    
                    private AtomicInteger threadIndex = new AtomicInteger(0);
    
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
                    }
                });
    
            Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
                .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
                .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        if (nettyClientConfig.isUseTLS()) {
                            if (null != sslContext) {
                                pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                                log.info("Prepend SSL handler");
                            } else {
                                log.warn("Connections are insecure as SSLContext is null!");
                            }
                        }
                        pipeline.addLast(
                            defaultEventExecutorGroup,
                            new NettyEncoder(),
                            new NettyDecoder(),
                            new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                            new NettyConnectManageHandler(),
                            new NettyClientHandler());
                    }
                });
    
            this.timer.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    try {
                        NettyRemotingClient.this.scanResponseTable();
                    } catch (Throwable e) {
                        log.error("scanResponseTable exception", e);
                    }
                }
            }, 1000 * 3, 1000);
    
            if (this.channelEventListener != null) {
                this.nettyEventExecutor.start();
            }
        }
    }
    
    • 负责NettyRemotingClient的启动。
    public class MQClientInstance {
    
        private void startScheduledTask() {
            if (null == this.clientConfig.getNamesrvAddr()) {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                    @Override
                    public void run() {
                        try {
                            MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                        } catch (Exception e) {
                            log.error("ScheduledTask fetchNameServerAddr exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
            }
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                    } catch (Exception e) {
                        log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                    }
                }
            }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.cleanOfflineBroker();
                        MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                    } catch (Exception e) {
                        log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                    }
                }
            }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.persistAllConsumerOffset();
                    } catch (Exception e) {
                        log.error("ScheduledTask persistAllConsumerOffset exception", e);
                    }
                }
            }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.adjustThreadPool();
                    } catch (Exception e) {
                        log.error("ScheduledTask adjustThreadPool exception", e);
                    }
                }
            }, 1, 1, TimeUnit.MINUTES);
        }
    }
    
    • producer的updateTopicRouteInfoFromNameServer负责更新路由信息,会用于消息的发送。

    相关文章

      网友评论

        本文标题:RocketMQ producer 启动流程

        本文链接:https://www.haomeiwen.com/subject/pcmkghtx.html