美文网首页程序员@IT·互联网
RocketMQ学习-NameServer-2

RocketMQ学习-NameServer-2

作者: 程序熊大 | 来源:发表于2018-03-23 00:02 被阅读501次

    上篇文章主要梳理了NameServer的启动器和配置信息,并复习了JVM中的关闭钩子这个知识点。这篇文章看下NameServer的其他模块。建议带着如下三个问题阅读:

    1. NameServer管理哪些信息?如何管理的?
    2. NameServer中对Netty的使用案例?
    3. NameServer中对Java并发编程使用案例?

    一、NamesrvController

    1. 作用:NameServer模块的控制器
    2. 主要属性
      • namesrvConfig:name server的配置信息
      • nettyServerConfig:name server中作为netty服务端的配置
      • scheduledExecutorService:调度线程池,用于:(1)周期性检查broker信息;(2)周期性打印路由信息;这两个检查每隔5秒交替进行。
      • kvConfigManager:name server配置的操作接口
      • routeInfoManager:name server路由信息的操作接口
      • remotingServer:netty服务器
      • brokerHousekeepingService:监听连接的broker的通道的关闭或异常事件,用于清理broker信息;
      • remotingExecutor:服务端处理请求的线程池
    3. 代码如下
    public class NamesrvController {
        private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    
        //name server的配置
        private final NamesrvConfig namesrvConfig;
    
        //netty server的配置定义
        private final NettyServerConfig nettyServerConfig;
    
        //创建一个具备调度功能的线程池,该线程池里只有一个线程,用于:(1)周期性检查broker信息;(2)周期性打印路由信息
        private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
            "NSScheduledThread"));
    
        //name server配置的操作接口
        private final KVConfigManager kvConfigManager;
    
        //name server路由信息的操作接口
        private final RouteInfoManager routeInfoManager;
    
        //服务器
        private RemotingServer remotingServer;
    
        //broker信息清理器,监听通道事件
        private BrokerHousekeepingService brokerHousekeepingService;
    
        //服务端处理请求的线程池
        private ExecutorService remotingExecutor;
    
        private Configuration configuration;
       
        //other code....
    }
    
    1. 主要方法

      • initialize:初始化

        public boolean initialize() {
        
                this.kvConfigManager.load();
        
                this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        
                this.remotingExecutor =
                    Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        
                this.registerProcessor();
        
                //服务器启动后5秒,开始每隔10秒检查broker的运行状态
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        
                    @Override
                    public void run() {
                        NamesrvController.this.routeInfoManager.scanNotActiveBroker();
                    }
                }, 5, 10, TimeUnit.SECONDS);
        
                //服务器启动后1秒,开始每隔10秒检查
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        
                    @Override
                    public void run() {
                        NamesrvController.this.kvConfigManager.printAllPeriodically();
                    }
                }, 1, 10, TimeUnit.MINUTES);
        
                return true;
            }
        
      • registerProcessor:注册处理器

            //在name server服务器上注册请求处理器,默认是DefaultRequestProcessor
            private void registerProcessor() {
                if (namesrvConfig.isClusterTest()) {
        
                    this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                        this.remotingExecutor);
                } else {
        
                    this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
                }
            }
        

      • 其他还有:构造方法、start方法、shutdown方法

    2. Java并发

      • Executors.newFixedThreadPool(),用于创建固定数量的线程池,根据线程池的运行原理:线程池启动时候没有线程,当新任务到来时就创建线程处理;由于coreSize和maxSize设置为相同大小,如果任务来的时候线程已经达到coreSize,就直接放入等待队列;keepAlive设置为0,目的是让线程数不会超过coreSize;blockqueue设置为LinkedBlockingQueue,表示是无界队列,最多可以放Integer.MAX_VALUE个任务。

        public static ExecutorService newFixedThreadPool(int nThreads,ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                            0L, TimeUnit.MILLISECONDS,
                                            new LinkedBlockingQueue<Runnable>(),
                                            threadFactory);
        }
        
      • 周期线程池

        NameServerController中使用了调度线程池,我们看下创建一个调度线程池的方法,即Executors.newSingleThreadScheduledExecutor(),该方法的定义如下所示:

            public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
                return new DelegatedScheduledExecutorService
                    (new ScheduledThreadPoolExecutor(1, threadFactory));
            }
        

        这种线程池的创建又委托给了DelegatedScheduledExecutorService类,这里为什么这么设计,不是太理解。不过可以看下真正创建调度线程池的代码:

            public ScheduledThreadPoolExecutor(int corePoolSize,
                                               ThreadFactory threadFactory) {
                super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                      new DelayedWorkQueue(), threadFactory);
            }
        

        上面这个方法,关键在于两点:(1)maxSize选了Integer.MAX_VALUE;(2)任务队列使用了延迟队列;再回头去看那个委托类的代码,就可以明白,委托类包装了ScheduledExecutorService执行器,提供了延迟或周期执行的接口。

            /**
             * A wrapper class that exposes only the ScheduledExecutorService
             * methods of a ScheduledExecutorService implementation.
             */
            static class DelegatedScheduledExecutorService
                    extends DelegatedExecutorService
                    implements ScheduledExecutorService {
                private final ScheduledExecutorService e;
                DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
                    super(executor);
                    e = executor;
                }
                public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
                    return e.schedule(command, delay, unit);
                }
                public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
                    return e.schedule(callable, delay, unit);
                }
                public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
                    return e.scheduleAtFixedRate(command, initialDelay, period, unit);
                }
                public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
                    return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
                }
            }
        

        找到上面几个主要类和接口的类图,再综合上面的代码,可以这么理解:Executors是一个工具类,提供了生成不同的线程池的工厂方法,其中包括newSingleThreadScheduledExecutor方法,由于ScheduledExecutorService扩展了ExecutorService接口,同时又想重用AbstractExecutorService中的一些方法,因此需要一个委托类,将ExecutorService和ScheduledExecutorService的功能整合在一个类中。

        ScheduledExecutorService.png
    1. Netty

      RemotingServer是name server中的通信服务端,在name controller初始化name server模块的时候,会将name server的请求处理器注册到netty服务器上。

    二、DefaultRequestProcessor

    在NameServerController中会注册请求处理器,那么name server的请求处理器实现了哪些接口呢,请看代码:

    public class DefaultRequestProcessor implements NettyRequestProcessor {
        private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    
        protected final NamesrvController namesrvController;
    
        public DefaultRequestProcessor(NamesrvController namesrvController) {
            this.namesrvController = namesrvController;
        }
    
        @Override
        public RemotingCommand processRequest(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
            if (log.isDebugEnabled()) {
                log.debug("receive request, {} {} {}",
                    request.getCode(),
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    request);
            }
    
            switch (request.getCode()) {
                case RequestCode.PUT_KV_CONFIG:
                    return this.putKVConfig(ctx, request);
                case RequestCode.GET_KV_CONFIG:
                    return this.getKVConfig(ctx, request);
                case RequestCode.DELETE_KV_CONFIG:
                    return this.deleteKVConfig(ctx, request);
                case RequestCode.REGISTER_BROKER:
                    Version brokerVersion = MQVersion.value2Version(request.getVersion());
                    if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                        return this.registerBrokerWithFilterServer(ctx, request);
                    } else {
                        return this.registerBroker(ctx, request);
                    }
                case RequestCode.UNREGISTER_BROKER:
                    return this.unregisterBroker(ctx, request);
                case RequestCode.GET_ROUTEINTO_BY_TOPIC:
                    return this.getRouteInfoByTopic(ctx, request);
                case RequestCode.GET_BROKER_CLUSTER_INFO:
                    return this.getBrokerClusterInfo(ctx, request);
                case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                    return this.wipeWritePermOfBroker(ctx, request);
                case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
                    return getAllTopicListFromNameserver(ctx, request);
                case RequestCode.DELETE_TOPIC_IN_NAMESRV:
                    return deleteTopicInNamesrv(ctx, request);
                case RequestCode.GET_KVLIST_BY_NAMESPACE:
                    return this.getKVListByNamespace(ctx, request);
                case RequestCode.GET_TOPICS_BY_CLUSTER:
                    return this.getTopicsByCluster(ctx, request);
                case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
                    return this.getSystemTopicListFromNs(ctx, request);
                case RequestCode.GET_UNIT_TOPIC_LIST:
                    return this.getUnitTopicList(ctx, request);
                case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
                    return this.getHasUnitSubTopicList(ctx, request);
                case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
                    return this.getHasUnitSubUnUnitTopicList(ctx, request);
                case RequestCode.UPDATE_NAMESRV_CONFIG:
                    return this.updateConfig(ctx, request);
                case RequestCode.GET_NAMESRV_CONFIG:
                    return this.getConfig(ctx, request);
                default:
                    break;
            }
            return null;
        }
        //其他具体的实现方法
    }
    

    从这个代码中可以看出两个方面的内容:

    1. 如何使用Netty处理网络请求。关键数据结构:(1)RemotingCommand:自定义的协议,携带请求参数和响应(2)ChannelHandlerContext:netty的数据结构,携带channel相关的信息。设计模型:processRequest:通过请求码进行请求转发;
    2. 请求处理方法(跟协议相关,具体参见remote模块)(1)processRequest:请求分发;(2)putKVConfig:将配置信息放在内存中;(3)getKVConfig:返回配置信息(4)deleteKVConfig:删除配置信息;(5)注册broker,支持两个注册方式:带过滤服务的(MQ版本在V3_0_11之后的)、不带过滤服务的,等其他处理方法。

    三、BrokerHousekeepingService

    该模块实现了ChannelEventListener接口,每个broker都会跟name server建立一个连接通道,当这个通道发生异常事件时,需要及时在name server这边清理掉对应的broker信息。异常事件的类型有:(1)通道关闭时;(2)通道抛出异常时;(3)通道空闲时。

    public class BrokerHousekeepingService implements ChannelEventListener {
        private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
        private final NamesrvController namesrvController;
    
        public BrokerHousekeepingService(NamesrvController namesrvController) {
            this.namesrvController = namesrvController;
        }
    
        @Override
        public void onChannelConnect(String remoteAddr, Channel channel) {
        }
    
        @Override
        public void onChannelClose(String remoteAddr, Channel channel) {
            this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
        }
    
        @Override
        public void onChannelException(String remoteAddr, Channel channel) {
            this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
        }
    
        @Override
        public void onChannelIdle(String remoteAddr, Channel channel) {
            this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
        }
    }
    

    四、RouteInfoManager

    这个模块是name server的核心模块,真正管理broker、消息队列等相关信息的地方。代码如下:

    public class RouteInfoManager {
        private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
        private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
        private final ReadWriteLock lock = new ReentrantReadWriteLock();
        private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
        private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
        private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
        private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
        private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    
        public RouteInfoManager() {
            this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
            this.brokerAddrTable = new HashMap<String, BrokerData>(128);
            this.clusterAddrTable = new HashMap<String, Set<String>>(32);
            this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
            this.filterServerTable = new HashMap<String, List<String>>(256);
        }
        //对外暴露的方法   
    }
    

    主要属性的含义如下:

    1. BROKER_CHANNEL_EXPIRED_TIME,表示一个broker距离上次发心跳包的最长时间,即120秒;
    2. 使用可重入读写锁实现并发安全、使用轻量级的非线程安全容器实现高效并发;【这点非常重要】
    3. topicQueueTable:用于管理topic和属于这个topic的队列的映射关系;
    4. brokerAddrTable:用于管理某个broker和它对应的信息
    5. clusterAddrTable:用于管理broker集群和集群中对应的broker的映射关系
    6. brokerLiveTable:用于管理broker的存活信息
    7. filterServerTable:用于管理broker和过滤服务列表【暂不理解】

    关于ReentrantReadWriteLock:

    1. 这里使用的锁是非公平锁

    2. ReentrantReadWriteLock基于Sync、ReadLock、WriteLock三个模块实现,Sync负责处理公平与否的问题。ReadLock和WriteLock通过锁外部对象ReentrantReadWriteLock来处理并发。在RoutInfoManager中的使用案例如下:

          public void deleteTopic(final String topic) {
              try {
                  try {
                      this.lock.writeLock().lockInterruptibly();
                      this.topicQueueTable.remove(topic);
                  } finally {
                      this.lock.writeLock().unlock();
                  }
              } catch (Exception e) {
                  log.error("deleteTopic Exception", e);
              }
          }
      

    五、KVConfigManager

    这个模块用于管理name server自己的配置信息,配置信息以json信息存放在文件中,以二维数组形式存在于内存中,请看代码:

    /**
     * 管理NameServer的配置属性
     */
    public class KVConfigManager {
        private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    
        private final NamesrvController namesrvController;
    
        //可重入读写锁
        private final ReadWriteLock lock = new ReentrantReadWriteLock();
    
        //配置表
        private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
            new HashMap<String, HashMap<String, String>>();
    
        public KVConfigManager(NamesrvController namesrvController) {
            this.namesrvController = namesrvController;
        }
        //这个类对外暴露的方法,省了
    }
    

    这个类对外暴露的方法有:

    1. load方法:将配置信息加载到内存中
    2. putKVConfig方法:将配置信息持久化
    3. deleteKVConfig方法:删除指定的配置项
    4. getKVListByNamespace和getKVConfig用于查询配置信息

    参考资料

    1. 消息队列技术点梳理
    2. netty的线程模型
    3. 《Java并发编程的艺术》

    相关文章

      网友评论

        本文标题:RocketMQ学习-NameServer-2

        本文链接:https://www.haomeiwen.com/subject/iwbzqftx.html