NamesrvStartup时序图
NameServer.png
NettyRemoteServer.start()
@Override
public void start() {
//创建一个DefaultEventExecutorGroup, 用于处理netty handler中的操作
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
//初始化Netty:
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
//Channel获取新连接的方式
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
//设置TCP/IP协议listen函数中的backlog参数(用来初始化服务端可连接队列),
.option(ChannelOption.SO_BACKLOG, 1024)
//套接字选项中的SO_REUSEADDR:表示允许重复使用本地地址和端口
.option(ChannelOption.SO_REUSEADDR, true)
//该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。
//设置该选项后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
//接收/发送Buffer大小
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
//编解码
new NettyEncoder(),
new NettyDecoder(),
//心跳检测
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
//连接管Handler,处理connect, disconnect, register, r/w , close等事件
new NettyConnectManageHandler(),
//处理接收到RemotingCommand消息后的事件, 收到服务器端响应后的相关操作
new NettyServerHandler()
);
}
});
//是否使用Netty内存池----默认启用
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
//绑定Netty监听端口
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
//启动 监听Channel各种事件 的线程
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
//每隔1秒扫描 responseTable 中是否有超时未回应的请求
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
参考资料
https://github.com/FrankerSun/RocketMQ-With-Comment
网友评论