美文网首页
(01)RocketMQ - NameServer漫谈源码一

(01)RocketMQ - NameServer漫谈源码一

作者: superleo_ef3c | 来源:发表于2020-03-12 17:06 被阅读0次

缘起

再一次面试过程中,和蔼可亲的面试官在聊完Kafka后,转而继续问RocketMQ的相关细节,因此就再特地的想详细的看看相关的具体实现原理。本文并非严格意义上的源码流程分析,涉及到的技术都会学习和研究一下。

早些年RocketMQ虽然开源,但是阿里内部使用的版本和社区版本还是有点区别,但现在RocketMQ已经入驻Apache基金委员会了,而且顺利孵化毕业。我从官网查到NameServerRocketMQ中的作用主要有以下2点(摘抄自官网):

  • Broker Management: NameServer accepts the register from Broker cluster and provides heartbeat mechanism to check whether a broker is alive.

  • Routing Management: each NameServer will hold whole routing info about the broker cluster and the queue info for clients query.

通过上述的描述,我们大体可以知道NameServer并不涉及过多的核心消息服务,充当的是Peer之间的协议与同步任务。

按照阿里系一贯的架构风格,比如dubbo就是注册中心 - 服务提供者- 服务消费者 三者之间的注册中心就等价于NameServer,只是dubbo官方推荐使用的是zookeeper作为注册中心,而RocketMQNameServer则是基于Netty来实现的。

NameServer的初体验

我们先从官方的例子摘出启动NameServer的代码:


public class NameServerInstanceTest {

    /**

    * 临时使用RocketMQ的内部的日志类

    */

    private static final InternalLogger LOG = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    public static void main(String[] args) {

        final int processID = getProcessID();

        final String uuid = UUID.randomUUID().toString();

        LOG.info("准备启动RocketMQ, 服务类型[type=MQ], 进程号是[pid={}], 全局[id={}]", processID, uuid);

        // NameServerConfig 的设置

        final NamesrvConfig namesrvConfig = new NamesrvConfig();

        // NettyServerConfig 的设置

        final NettyServerConfig nettyServerConfig = new NettyServerConfig();

        // Netty的监听设置端口

        nettyServerConfig.setListenPort(9876);

        // 创建 NameServerController

        NamesrvController namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);

        // 初始化

        boolean initialized = namesrvController.initialize();

        if (initialized) {

            try {

                // 启动

                namesrvController.start();

            } catch (Exception ex) {

                String errorMsg = String.format("启动RocketMQ失败, 服务类型[type=MQ], 进程号是[pid=%d], 全局[id=%s]", processID, uuid);

                LOG.error(errorMsg, ex);

                namesrvController.shutdown();

            }

        }

        // 注册个停机钩子, 确保能够优雅关机

        Runtime.getRuntime()

                .addShutdownHook(new Thread(() -> {

                    namesrvController.shutdown();

                    LOG.info("关闭RocketMQ成功, 服务类型[type=MQ], 进程号是[pid={}], 全局[id={}]", processID, uuid);

                }));

    }

    /**

    * 获取当前Java进程ID

    *

    * @return 返回当前Java进程ID

    */

    private static int getProcessID() {

        RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();

        return Integer.parseInt(runtimeMXBean.getName().split("@")[0]);

    }

}

在代码执行到namesrvController.initialize()之前都是各种内部配置的准备工作,进入namesrvController.initialize()内部,我们会发现做了不少准备工作操作,其中有两个线程已经启动了:


public boolean initialize() {

    // 加载KVConfigManager

    this.kvConfigManager.load();

    // 初始化但不启动Netty

    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

    /**

    *  这个指的是对于RocketMQ自定义注册事件之外的漏网之鱼,有一个默认的处理器去处理

    *

    *  @see org.apache.rocketmq.client.impl.MQClientAPIImpl

    */

    this.remotingExecutor =

        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

    this.registerProcessor();

    // 线程一: 定期扫描不活跃的broker

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        // 每个NameServer节点都只扫描自己的链路信息,集群内的NameServer节点相互不会同步数据

        @Override

        public void run() {

                NamesrvController.this.routeInfoManager.scanNotActiveBroker();

        }

    }, 5, 10, TimeUnit.SECONDS);

    // 线程二: 定期打印KVConfigManager的信息

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            // 为了方便观察输出, 临时改成5秒输出一次

            @Override

            public void run() {

                NamesrvController.this.kvConfigManager.printAllPeriodically();

            }

    }, 1, 5, TimeUnit.SECONDS);

    // SSL启动配置忽略

}

从这初步也能看出,所谓RocketMQNameServer集群中的节点其实都是相互之间不共享任何数据并且是无状态,官网明确提到了该架构下的生产者和消费者由于都往代码中指定的所有NameServer发送心跳信息,因此这些生产者和消费者也能从任意的NameServer获取到全部的元数据信息。

这里需要注意的就是,NameServer的个数决定了心跳数的个数。比如1个NameServer,3个broker,2个producer和5个consumer的部署架构,就是每次1*10=10个心跳包,此时如果NameServer为了避免单点改成了3个,那就是3*10=30个心跳包了。对于天然就是为了解决C10K问题的Netty来说,单机支撑几千个心跳包数据量的连接不是问题,所以设计简洁的的NameServer一般不会成为集群的瓶颈。

KVConfigManager的实现

KVConfigManager主要作用就是对基于存放key=namespace,value=HashMap的一个Hash表configTable进行读写操作。


public class KVConfigManager {



    private final NamesrvController namesrvController;

    // 读写锁

    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    // 配置Hash表

    private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =

        new HashMap<String, HashMap<String, String>>();



    // 对configTable的操作部分先省略

    ...   

}       

可以通过RemotingCommand(id=RequestCode.PUT_KV_CONFIG)命令发送到NameServer,可以实时对其configTable进行put操作,增加Key-Value键值对,然后在下一次其他节点发送心跳命令RemotingCommand(id=RequestCode.REGISTER_BROKER)后,把设置好的这些Key-Value作为Body返回回去。


// DefaultRequestProcessor的processRequest方法片段

// 从configTable取出该namespace的Key-Value值

byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);

// 然后设置到响应的body中

response.setBody(jsonValue);

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null);

return response;

通过源码,发现涉及到namespace的参数都是hard-code成了一个静态常量NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG="ORDER_TOPIC_CONFIG",由于注释和文档有限,这里只能是猜测先前的设计是可以通节点之间通信的RemotingCommand,对configTable进行写操作,然后在返回给,比如按照namespace来对brokerproducerconsumer进行隔离,但可能是这个需求又不需要或者计划以后实现就搁浅了。至少是目前我看的4.7.*版本,还是这样。但是

刚刚提到内部采用了JUC的读写锁ReadWriteLock结合普通的HashMap来实现,那么现在问题来了,为什么不直接使用ConcurrentHashMap来实现呢?

我们先看一下RocketMQ是如何使用读写锁ReadWriteLock的:


// 设置或更新KV值

public void putKVConfig(final String namespace, final String key, final String value) {

    try {

        // 使用可以中断的写锁

        this.lock.writeLock().lockInterruptibly();

        try {

            HashMap<String, String> kvTable = this.configTable.get(namespace);

            // 如果configTable为空, 则创建一个新的

            if (null == kvTable) {

                kvTable = new HashMap<String, String>();

                this.configTable.put(namespace, kvTable);

                log.info("putKVConfig create new Namespace {}", namespace);

            }

            // 把最新的值添加或覆盖进去

            final String prev = kvTable.put(key, value);

            if (null != prev) {

                log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}",

                    namespace, key, value);

            } else {

                log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}",

                    namespace, key, value);

            }

        } finally {

            this.lock.writeLock().unlock();

        }

    } catch (InterruptedException e) {

        log.error("putKVConfig InterruptedException", e);

    }

    // 持久化到硬盘上

    this.persist();

}



// 持久化操作

public void persist() {

    try {

        this.lock.readLock().lockInterruptibly();

        try {

            // 在FastJSON上进行了封装

            KVConfigSerializeWrapper kvConfigSerializeWrapper = new KVConfigSerializeWrapper();

            kvConfigSerializeWrapper.setConfigTable(this.configTable);

            String content = kvConfigSerializeWrapper.toJson();

            // 写入磁盘操作

            if (null != content) {

                // 写入磁盘的逻辑是

                // 1: 用内存中的configTable生成一个临时文件tmp,同时把旧的kvConfig.json中的内容读取出来,写入到一个bak的备份文件

                // 2: 把旧的kvConfig.json文件删除

                // 3: 把最新的tmp文件重名回kvConfig.json

                MixAll.string2File(content, this.namesrvController.getNamesrvConfig().getKvConfigPath());

            }

        } catch (IOException e) {

            log.error("persist kvconfig Exception, "

                + this.namesrvController.getNamesrvConfig().getKvConfigPath(), e);

        } finally {

            this.lock.readLock().unlock();

        }

    } catch (InterruptedException e) {

        log.error("persist InterruptedException", e);

    }

}   

从这里可以看出,使用读写锁ReentrantReadWriteLock而不是ConcurrentHashMap是因为,除了对集合Map操作外,还需要进行诸如序列化、磁盘同步回写等操作。那为什么不用synchronized关键字呢,不是说已经得到增强,在高JDK版本下性能反而更好?因为从代码可以看出代码在尝试获取锁的时候是可以允许中断的,而synchronized关键字等待则是不能被中断。所以对于这样的并发场景下,框架开发人员采用了ReentrantReadWriteLock

相关文章

网友评论

      本文标题:(01)RocketMQ - NameServer漫谈源码一

      本文链接:https://www.haomeiwen.com/subject/dqmmjhtx.html