美文网首页java 成神之路
RocketMQ 生产者 Producer 启动过程

RocketMQ 生产者 Producer 启动过程

作者: jijs | 来源:发表于2018-12-07 00:10 被阅读41次

    MQProducer

    从类关系中可以看出,MQProducer 有两种实现方式。一个是 DefaultMQProducer,另一个是 TransactionMQProducer。

    • DefaultMQProducer: 我们常用的生产者。
    • TransactionMQProducer:继承自 DefaultMQProducer,并支持事务消息。

    下面我们来分析下 DefaultMQProducer 启动的过程。

    启动示例

    public class Producer {
        public static void main(String[] args) throws UnsupportedEncodingException {
            try {
                DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
                producer.setNamesrvAddr("....");
                ......
                producer.start();
                ......
            }catch(Exception e){}
        }
    }
    

    创建 DefaultMQProducer 实例,然后制定一些参数,调用 start() 方法就开启了生产者。

    DefaultMQProducer 参数分析

    public class DefaultMQProducer extends ClientConfig implements MQProducer {
    
        //producer 组名
        private String producerGroup;
    
        // Topic 名字,默认为“TBW102”
        private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
    
        // 创建 Topic 默认的4个队列
        private volatile int defaultTopicQueueNums = 4;
    
        // 发送消息超时时间
        private int sendMsgTimeout = 3000;
    
        // 当发送的消息大于 4K 时,开始压缩消息。
        private int compressMsgBodyOverHowmuch = 1024 * 4;
    
        //同步发送消息,发送失败时再尝试发送2次数
        private int retryTimesWhenSendFailed = 2;
    
        // 异步发送消息,发送失败时再尝试发送2次数
        private int retryTimesWhenSendAsyncFailed = 2;
    
        //发送broker消息存储失败时,是否尝试去试发送其他的broker
        private boolean retryAnotherBrokerWhenNotStoreOK = false;
    
        //最大允许发送字节数
        private int maxMessageSize = 1024 * 1024 * 4; // 4M
    

    DefaultMQProducer 中定义的类属性

    • producerGroup: 生产者组名
    • createTopicKey :Topic 名字,默认为“TBW102”
    • defaultTopicQueueNums :创建 Topic 默认的4个队列
    • sendMsgTimeout :默认发送消息3秒超时
    • compressMsgBodyOverHowmuch :当发送的消息大于 4K 时,开始压缩消息。
    • retryTimesWhenSendFailed :同步发送消息,发送失败时再尝试发送2次数。
    • retryTimesWhenSendAsyncFailed :异步发送消息,发送失败时再尝试发送2次数
    • retryAnotherBrokerWhenNotStoreOK :发送broker消息存储失败时,是否尝试去试发送其他的broker

    DefaultMQProducer 还有可以设置其他的参数,这里就不说明了。

    Producer 启动

    public void start() throws MQClientException {
        this.defaultMQProducerImpl.start();
    }
    
    public void start() throws MQClientException {
        this.start(true);
    }
    
    public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            // 1. 只有 serviceState 状态为 CREATE_JUST 时,才启动 Producer
            case CREATE_JUST:
                //2. 防止启动多个 Producer,先把 serviceState 状态修改为 START_FAILED。
                this.serviceState = ServiceState.START_FAILED;
                // 3. 检查 groupName 是否合法
                this.checkConfig();
    
                //4. 判断是否需要设置 InstanceName 。
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
                // 5. 构建 MQClientInstance 对象。
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                // 6. 将 DefaultMQProducerImpl 对象注册到 ConcurrentHashMap<String/* group */, MQProducerInner> producerTable 中
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                // 7.以主题名"TBW102"为key值,新初始化的TopicPublishInfo对象为value值存入DefaultMQProducerImpl.topicPublishInfoTable变量中
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                // 8. 启动 第五步创建的 MQClientInstance 实例。
                if (startFactory) {
                    mQClientFactory.start();
                }
    
                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                // 9. 设置DefaultMQProducerImpl的ServiceState为RUNNING
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
        // 10.  向所有的 broker 发送心跳和上传 FilterClass
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    }
    
    1. 启动Producer的时候判断 serviceState 的当前状态,只有 serviceState 状态为 CREATE_JUST 时,才启动 Producer。否则抛出异常信息。

    2、同时防止启动多个 Producer,先把 serviceState 状态修改为 START_FAILED。

    3、 检查 groupName 是否合法。比如不能为空,是否符合正则 ^[%|a-zA-Z0-9_-]+$,并且最大长度不能超过 255(CHARACTER_MAX_LENGTH = 255);
    groupName 也不能等于 DEFAULT_PRODUCER。只要满足上面条件,则抛异常信息。

    4、如果 producerGroup 不等于 CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER" ,然后调用 changeInstanceNameToPID() 方法判断名字不是 "DEFAULT" 则更改 instanceName。

    public void changeInstanceNameToPID() {
        if (this.instanceName.equals("DEFAULT")) {
            this.instanceName = String.valueOf(UtilAll.getPid());
        }
    }
    public static int getPid() {
        RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
        String name = runtime.getName(); // format: "pid@hostname"
        try {
            return Integer.parseInt(name.substring(0, name.indexOf('@')));
        .....
    }
    

    5、构建 MQClientInstance 对象。

    public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
            String clientId = clientConfig.buildMQClientId();
            MQClientInstance instance = this.factoryTable.get(clientId);
            if (null == instance) {
                instance =
                    new MQClientInstance(clientConfig.cloneClientConfig(),
                        this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
                MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
                if (prev != null) {
                    instance = prev;
                    log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
                } else {
                    log.info("Created new MQClientInstance for clientId:[{}]", clientId);
                }
            }
    
            return instance;
        }
    
    • 5.1 首先生成 clientId:ip@instanceName 或 ip@instanceName@unitName
    • 5.2 如果 factoryTable 中是不已经存在 MQClientInstance 实例,则创建。 (下面有单独分析该源码)

    6、将 DefaultMQProducerImpl 对象注册到 ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();

    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    
    public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
        if (null == group || null == producer) {
            return false;
        }
        MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
        if (prev != null) {
            log.warn("the producer group[{}] exist already.", group);
            return false;
        }
        return true;
    }
    

    7、以主题名"TBW102"为key值,新初始化的TopicPublishInfo对象为value值存入DefaultMQProducerImpl.topicPublishInfoTable变量中

    8、调用 第五步创建的 MQClientInstance 实例 的start() 方法。
    该方法做了很多事情:

    • 获取NameServer地址
    • 启动 Netty 客户端服务
    • 设置多个定时任务
    • 开启 pullMessageService 服务
    • 开启 rebalanceService 服务
    • 开启 发送消息服务

    下面有具体代码分析MQClientInstance.start() 方法。

    9、设置DefaultMQProducerImpl的ServiceState为RUNNING
    10、向所有的 broker 发送心跳和上传 FilterClass

    创建MQClientInstance实例(第5.2步)

    上面 5.2 步骤中创建MQClientInstance 的代码如下:

    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
        this.clientConfig = clientConfig;
        this.instanceIndex = instanceIndex;
        this.nettyClientConfig = new NettyClientConfig();
        this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
        this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
        this.clientRemotingProcessor = new ClientRemotingProcessor(this);
        // Netty 中注册接收请求的处理器。
        this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
        //设置 NameServer 地址。
        if (this.clientConfig.getNamesrvAddr() != null) {
            this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
            log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
        }
        // 客户端ID
        this.clientId = clientId;
        //创建 MQAdminImpl 对象进行和 NameServer 进行交互,比如创建Topic、获取 Queue等
        this.mQAdminImpl = new MQAdminImpl(this);
        // 创建 pullMessageService 服务
        this.pullMessageService = new PullMessageService(this);
        // 创建 rebalanceService  服务
        this.rebalanceService = new RebalanceService(this);
        // 创建 DefaultMQProducer 服务
        this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
        this.defaultMQProducer.resetClientConfig(clientConfig);
        // 开启 Comsumer 统计服务
        this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
    
        log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
            this.instanceIndex,
            this.clientId,
            this.clientConfig,
            MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
    }
    

    主要功能:

    • 创建 MQAdminImpl 对象进行和 NameServer 进行交互,比如创建Topic、获取 Queue等
    • 创建 pullMessageService 服务
    • 创建 rebalanceService 服务,供 Consumer 端使用
    • 创建 DefaultMQProducer 服务,
    • 开启 Comsumer 统计服务。统计最近一段时间内,消费成功个数、消费失败个数等信息。

    启动MQClientInstance 服务 (第8步)

    public void start() throws MQClientException {
    
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // 1. 如果配置NameServer地址,则从默认服务器地址中获取(该地址不可改变)
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // 2. 启动 Netty 客户端服务
                    this.mQClientAPIImpl.start();
                    // 3. 设置多个定时任务
                    this.startScheduledTask();
                    // 4. 开启 pullMessageService 服务
                    this.pullMessageService.start();
                    // 5. 开启 rebalanceService 服务
                    this.rebalanceService.start();
                    // 6. 开启 发送消息服务
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }
    

    主要这几步骤操作。

    • 1、获取NameServer地址
      如果启动 Producer 时没有指定 NameServer,则程序会向一个Http地址发送请求来获取NameServer地址。通过这种方式可以动态的配置 NameServer。从而达到动态增加和删除NameServer服务。
    public static String getWSAddr() {
        String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
        String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
        String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup;
        if (wsDomainName.indexOf(":") > 0) {
            wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup;
        }
        return wsAddr;
    }
    
    • 2、启动 Netty 客户端服务
    • 3、调用startScheduledTask() 方法设置多个定时任务
    • 4、开启 pullMessageService 服务
    • 5、开启 rebalanceService 服务
    • 6、开启 发送消息服务

    startScheduledTask() 方法:定时任务

    private void startScheduledTask() {
        // 1.如果 NameServer 地址默认没配置,则定时向一个Http地址获取 
        if (null == this.clientConfig.getNamesrvAddr()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    } catch (Exception e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }
    
        // 2. 定时的从 NameServer 中获取 Topic、broker、queue 相关信息
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    
        // 3. 定时清理无效的Broker,并向所有的Broker 发送心跳数据
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.cleanOfflineBroker();
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                } catch (Exception e) {
                    log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                }
            }
        }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
    
        // 4. 定时的持久化 Consumer 端消费每个 queue的 offset 数据。
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.persistAllConsumerOffset();
                } catch (Exception e) {
                    log.error("ScheduledTask persistAllConsumerOffset exception", e);
                }
            }
        }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    
        // 5. 调整消费端的线程数
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.adjustThreadPool();
                } catch (Exception e) {
                    log.error("ScheduledTask adjustThreadPool exception", e);
                }
            }
        }, 1, 1, TimeUnit.MINUTES);
    }
    
    • 1、定时更新 NameServer 地址
      每个2分钟,程序会向一个Http地址发送请求来获取NameServer地址来动态更新NameServer地址。

    • 2、 定时的从 NameServer 中获取 Topic、broker、queue 相关信息
      默认每隔 30秒去 NameServer 中获取Topic、broker、queue等相关信息。
      如果有新broker注册或下线,producer端会在30秒之内感知。

    • 3、定时清理无效的Broker,并向所有的Broker 发送心跳数据.
      默认每隔 30 秒向 Broker 发送心跳数据 和 用户自定义的 filterclass 类。

    • 4、定时的持久化 Consumer 端消费每个 queue的 offset 数据。
      默认每隔 5 秒持久或 Consumer 消费的 queue 的 offset信息。
      持久化分为,远程持久化和本地持久化。
      MessageModel.CLUSTERING 模式 queue的offset 保存到 broker上。
      BROADCASTING("BROADCASTING") 模式 queue 的 offset 保存在本地。

    • 5、调整消费端的线程数
      每隔 1 分钟计算每一个queue中消息挤压的数量,如果超过100000条,则增加消费线程的并发数,如果小于80000条则减少消费者的线程数。
      不过进入源码中看,调整消费者的线程数都注释掉了。


    相关文章

      网友评论

        本文标题:RocketMQ 生产者 Producer 启动过程

        本文链接:https://www.haomeiwen.com/subject/motzcqtx.html