在之前使用rocketmq时,很多细节并没有了解到。后面直接从网上下载了4.8版本的源码,直接进行阅读。
入口还是很好找的,不过感觉看这块源码,需要一些nio的基础,最好有使用或者了解过Netty的源码,阅读起来会更方便。
感觉直接贴图比较方便一点。下面是namesrv的启动类的位置。
namesrv的启动类
先来说说namesrv是做什么的。这个模块其实相当于一个注册中心的功能,实时监控broker的存活状态,而客户端(生产者 或者消费者)只需要从注册中心,去获取broker的真实地址,便可以直接与Broker进行通信, 进行消息的发送,消费,存储等。
在讲如何启动之前,要先讲一下俩个基本的控制类NamesrvConfig与NettyServerConfig
这俩个类其实就是一个配置类,用于存放从配置文件中读取到的配置项
NamesrvConfig
结构如下
public class NamesrvConfig {
//rocketmqhome的位置,默认通过环境变量获取
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
//kvConfig.json的位置,目前暂时不知道这个文件是做什么的
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
//namesrv.properties配置文件存放的位置
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
//默认不支持消息的有序
private boolean orderMessageEnable = false;
}
NettyServerConfig
其实这一块是底层通信模块,netty的配置。
public class NettyServerConfig implements Cloneable {
//监听的端口,默认是8888
private int listenPort = 8888;
//netty处理业务的线程池
private int serverWorkerThreads = 8;
//处理消息消费,心跳发送,消息发送等。这里是网上找到的概述,暂时不那么清楚作用
private int serverCallbackExecutorThreads = 0;
//维护selector轮询的线程
private int serverSelectorThreads = 3;
//同步发送支持的一次发送的最大消息数
private int serverOnewaySemaphoreValue = 256;
//异步发送支持的一次发送的最大消息数
private int serverAsyncSemaphoreValue = 64;
//通道的最大空闲时间
private int serverChannelMaxIdleTimeSeconds = 120;
//网络发送区域的缓存区大小
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
//网络接收区域的缓存区大小
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
//是否开启缓存
private boolean serverPooledByteBufAllocatorEnable = true;
//linux下要开启epoll Io模型
private boolean useEpollNativeSelector = false;
}
接下来说说NamesrvController
其实里面有一堆成员变量,但是暂时只是猜到做什么的。还没有仔细去看
NamesrvController
public class NamesrvController {
注册中心的配置
private final NamesrvConfig namesrvConfig;
底层通信配置(底层是使用netty)
private final NettyServerConfig nettyServerConfig;
其实还有一些成员变量,但是由于没有不是本次要讲的重点,就先省略......
}
接下来直接从源码入手
启动类是如何启动的?
public class NamesrvStartup {
通过main方法启动
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
先构建controller
NamesrvController controller = createNamesrvController(args);
再启动controller
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
}
从上面的代码看到,还是比较简单的,先构建,再启动,所以下面看一下如何构建
如何构建NamesrvController
代码如下
public class NamesrvStartup {
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
。。。省略
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
监听端口设置为9876(默认)
nettyServerConfig.setListenPort(9876);
如果启动的命令行中,有c参数,指定配置文件,那么就会加载配置文件,并且封装到对应的配置类中
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
读取配置文件,进行加载
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
通过反射,设置到配置类中的成员变量
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
设置配置文件的路径
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
命令行可以输入一些参数,对配置进行覆盖,我们可以通过命令行启动进行覆盖
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
//没有设置环境变量会报错,直接退出程序
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// 。。。省略部分代码
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
return controller;
}
}
NamesrvController的构建流程如下:
1.配置类的构建NamesrvConfig,NettyServerConfig。这里主要是涉及到底层通信的配置,以及注册中心文件的一些存放路径
2.命令行如果有指定配置文件的存放路径,则将配置文件解析,并且将配置设置到配置类(NamesrvConfig,NettyServerConfig)的成员变量
3.读取命令行里面的参数,如果有的话,对namesrvConfig的配置,可以进行覆盖
所以可以通过配置文件的方式去设置相关配置,也可以通过命令行的方式去设置相关配置。
NamesrvController是如何启动的?
接下来还是先回到一开始的入口
public class NamesrvStartup {
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
//NamesrvController的构建
NamesrvController controller = createNamesrvController(args);
启动。。。。。。。。。。接下来看看这个方法是如何运行的
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
}
启动方法的代码
public class NamesrvStartup {
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
先进行初始化
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
添加一个回调,用于暂停运行的时候,停止controller
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
启动
controller.start();
return controller;
}
}
先来看看如何初始化,代码如下
NamesrvController的初始化
public class NamesrvController {
用于远程通信的服务类,以netty为底层
private RemotingServer remotingServer;
public boolean initialize() {
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
线程池,用于处理远程通信
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
注册处理器,其实看过netty底层的可以理解成一个处理请求的channelHandler,或者说是springmvc里面的dispatcher,用于处理请求的处理器。
this.registerProcessor();
心跳检查,通过定时任务,10秒一次,将失效的broker移除。这里如何移除的话,每个broker都有一个最新更新时间,与当前时间对比,如果超过120秒了,那说明失效了。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
。。。省略部分代码
return true;
}
}
心跳检查的代码
public class RouteInfoManager {
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
超过10秒,则将与broker的通信切断,同时将broker移除
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
}
初始化的过程中,会初始化一系列的组件。
比如远程通信的服务,还有用于远程通信的线程池,以及注册处理远程通信的handler。
另外也会添加心跳检测,用于移除失效的broker。初始化完之后,就是如何启动了
NamesrvController如何启动
代码如下
public class NamesrvController {
public void start() throws Exception {
在初始化的过程中,已经将该服务给初始化了,由于底层是netty,所以那会也用了nettyServerConfig作为参数传入。
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
}
启动的代码如下。
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
prepareSharableHandlers();
看到这一步,nettyServerConfig的配置都刚刚好被使用上了。
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
}
至此,启动的过程就完成了。
namesrv在启动的过程中经历几步。
1.controller的实例化。需要通过配置文件,或者参数,读取相关的配置,如底层netty需要的配置,以及注册中心的配置,并且进行设置
2.controller的初始化。主要是对底层的远程通信服务进行初始化
3.启动,将第二步中的通信服务进去启动即可。
网友评论